Bug 908296 - Upgrade psutil to version 1.0.1; rs=glandium

Archive obtained from
https://psutil.googlecode.com/files/psutil-1.0.1.tar.gz and extracted
over existing source code without modifications.
This commit is contained in:
Gregory Szorc
2013-08-22 23:36:57 -07:00
parent 94e359dda6
commit 9fb86d2ef8
53 changed files with 3201 additions and 669 deletions

View File

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -16,13 +16,10 @@ import os
import psutil
from psutil._compat import PY3
from test_psutil import DEVNULL
from test_psutil import (reap_children, get_test_subprocess, sh, which,
skipUnless)
from test_psutil import *
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
TOLERANCE = 200 * 1024 # 200 KB
MUSE_AVAILABLE = which('muse')
@@ -56,14 +53,6 @@ class BSDSpecificTestCase(unittest.TestCase):
def tearDown(self):
reap_children()
def assert_eq_w_tol(self, first, second, tolerance):
difference = abs(first - second)
if difference <= tolerance:
return
msg = '%r != %r within %r delta (%r difference)' \
% (first, second, tolerance, difference)
raise AssertionError(msg)
def test_BOOT_TIME(self):
s = sysctl('sysctl kern.boottime')
s = s[s.find(" sec = ") + 7:]
@@ -130,69 +119,98 @@ class BSDSpecificTestCase(unittest.TestCase):
syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE
self.assertEqual(psutil.virtual_memory().total, syst)
@retry_before_failing()
def test_vmem_active(self):
syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
self.assert_eq_w_tol(psutil.virtual_memory().active, syst, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().active, syst,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_inactive(self):
syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
self.assert_eq_w_tol(psutil.virtual_memory().inactive, syst, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_wired(self):
syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
self.assert_eq_w_tol(psutil.virtual_memory().wired, syst, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_cached(self):
syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
self.assert_eq_w_tol(psutil.virtual_memory().cached, syst, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_free(self):
syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
self.assert_eq_w_tol(psutil.virtual_memory().free, syst, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().free, syst,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_buffers(self):
syst = sysctl("vfs.bufspace")
self.assert_eq_w_tol(psutil.virtual_memory().buffers, syst, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
delta=TOLERANCE)
# --- virtual_memory(); tests against muse
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
def test_total(self):
num = muse('Total')
self.assertEqual(psutil.virtual_memory().total, num)
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@retry_before_failing()
def test_active(self):
num = muse('Active')
self.assert_eq_w_tol(psutil.virtual_memory().active, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().active, num,
delta=TOLERANCE)
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@retry_before_failing()
def test_inactive(self):
num = muse('Inactive')
self.assert_eq_w_tol(psutil.virtual_memory().inactive, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
delta=TOLERANCE)
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@retry_before_failing()
def test_wired(self):
num = muse('Wired')
self.assert_eq_w_tol(psutil.virtual_memory().wired, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().wired, num,
delta=TOLERANCE)
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@retry_before_failing()
def test_cached(self):
num = muse('Cache')
self.assert_eq_w_tol(psutil.virtual_memory().cached, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().cached, num,
delta=TOLERANCE)
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@retry_before_failing()
def test_free(self):
num = muse('Free')
self.assert_eq_w_tol(psutil.virtual_memory().free, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().free, num,
delta=TOLERANCE)
@skipUnless(MUSE_AVAILABLE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
@retry_before_failing()
def test_buffers(self):
num = muse('Buffer')
self.assert_eq_w_tol(psutil.virtual_memory().buffers, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
delta=TOLERANCE)
if __name__ == '__main__':
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase))
unittest.TextTestRunner(verbosity=2).run(test_suite)
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -14,24 +14,16 @@ import time
import os
import re
from test_psutil import sh, get_test_subprocess
from test_psutil import *
from psutil._compat import PY3
import psutil
TOLERANCE = 200 * 1024 # 200 KB
class LinuxSpecificTestCase(unittest.TestCase):
def assert_eq_w_tol(self, first, second, tolerance):
difference = abs(first - second)
if difference <= tolerance:
return
msg = '%r != %r within %r delta (%r difference)' \
% (first, second, tolerance, difference)
raise AssertionError(msg)
@unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
reason="os.statvfs() function not available on this platform")
@skip_on_not_implemented()
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
@@ -79,40 +71,52 @@ class LinuxSpecificTestCase(unittest.TestCase):
total = int(lines[0].split()[1]) * 1024
self.assertEqual(total, psutil.virtual_memory().total)
@retry_before_failing()
def test_vmem_used(self):
lines = sh('free').split('\n')[1:]
used = int(lines[0].split()[2]) * 1024
self.assert_eq_w_tol(used, psutil.virtual_memory().used, TOLERANCE)
self.assertAlmostEqual(used, psutil.virtual_memory().used,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_free(self):
lines = sh('free').split('\n')[1:]
free = int(lines[0].split()[3]) * 1024
self.assert_eq_w_tol(free, psutil.virtual_memory().free, TOLERANCE)
self.assertAlmostEqual(free, psutil.virtual_memory().free,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_buffers(self):
lines = sh('free').split('\n')[1:]
buffers = int(lines[0].split()[5]) * 1024
self.assert_eq_w_tol(buffers, psutil.virtual_memory().buffers, TOLERANCE)
self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_cached(self):
lines = sh('free').split('\n')[1:]
cached = int(lines[0].split()[6]) * 1024
self.assert_eq_w_tol(cached, psutil.virtual_memory().cached, TOLERANCE)
self.assertAlmostEqual(cached, psutil.virtual_memory().cached,
delta=TOLERANCE)
def test_swapmem_total(self):
lines = sh('free').split('\n')[1:]
total = int(lines[2].split()[1]) * 1024
self.assertEqual(total, psutil.swap_memory().total)
@retry_before_failing()
def test_swapmem_used(self):
lines = sh('free').split('\n')[1:]
used = int(lines[2].split()[2]) * 1024
self.assert_eq_w_tol(used, psutil.swap_memory().used, TOLERANCE)
self.assertAlmostEqual(used, psutil.swap_memory().used,
delta=TOLERANCE)
@retry_before_failing()
def test_swapmem_free(self):
lines = sh('free').split('\n')[1:]
free = int(lines[2].split()[3]) * 1024
self.assert_eq_w_tol(free, psutil.swap_memory().free, TOLERANCE)
self.assertAlmostEqual(free, psutil.swap_memory().free,
delta=TOLERANCE)
def test_cpu_times(self):
fields = psutil.cpu_times()._fields
@@ -122,20 +126,25 @@ class LinuxSpecificTestCase(unittest.TestCase):
# guest >= 2.6.24
# guest_nice >= 3.2.0
if kernel_ver_info >= (2, 6, 11):
assert 'steal' in fields, fields
self.assertIn('steal', fields)
else:
assert 'steal' not in fields, fields
self.assertNotIn('steal', fields)
if kernel_ver_info >= (2, 6, 24):
assert 'guest' in fields, fields
self.assertIn('guest', fields)
else:
assert 'guest' not in fields, fields
self.assertNotIn('guest', fields)
if kernel_ver_info >= (3, 2, 0):
assert 'guest_nice' in fields, fields
self.assertIn('guest_nice', fields)
else:
assert 'guest_nice' not in fields, fields
self.assertNotIn('guest_nice', fields)
if __name__ == '__main__':
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase))
unittest.TextTestRunner(verbosity=2).run(test_suite)
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -16,11 +16,10 @@ import re
import psutil
from psutil._compat import PY3
from test_psutil import reap_children, get_test_subprocess, sh
from test_psutil import *
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
TOLERANCE = 500 * 1024 # 500 KB
def sysctl(cmdline):
@@ -55,14 +54,6 @@ class OSXSpecificTestCase(unittest.TestCase):
def tearDown(self):
reap_children()
def assert_eq_w_tol(self, first, second, tolerance):
difference = abs(first - second)
if difference <= tolerance:
return
msg = '%r != %r (tolerance=%r, difference=%s)' \
% (first, second, tolerance, difference)
raise AssertionError(msg)
def test_process_create_time(self):
cmdline = "ps -o lstart -p %s" %self.pid
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
@@ -108,21 +99,29 @@ class OSXSpecificTestCase(unittest.TestCase):
sysctl_hwphymem = sysctl('sysctl hw.memsize')
self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM)
@retry_before_failing()
def test_vmem_free(self):
num = vm_stat("free")
self.assert_eq_w_tol(psutil.virtual_memory().free, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().free, num,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_active(self):
num = vm_stat("active")
self.assert_eq_w_tol(psutil.virtual_memory().active, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().active, num,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_inactive(self):
num = vm_stat("inactive")
self.assert_eq_w_tol(psutil.virtual_memory().inactive, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_wired(self):
num = vm_stat("wired")
self.assert_eq_w_tol(psutil.virtual_memory().wired, num, TOLERANCE)
self.assertAlmostEqual(psutil.virtual_memory().wired, num,
delta=TOLERANCE)
# --- swap mem
@@ -146,7 +145,12 @@ class OSXSpecificTestCase(unittest.TestCase):
self.assertEqual(tot1, tot2)
if __name__ == '__main__':
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
unittest.TextTestRunner(verbosity=2).run(test_suite)
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -16,8 +16,7 @@ import datetime
import psutil
from psutil._compat import PY3
from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX,
BSD, skip_on_access_denied, sh, skipIf)
from test_psutil import *
def ps(cmd):
@@ -26,12 +25,15 @@ def ps(cmd):
"""
if not LINUX:
cmd = cmd.replace(" --no-headers ", " ")
if SUNOS:
cmd = cmd.replace("-o command", "-o comm")
cmd = cmd.replace("-o start", "-o stime")
p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
if not LINUX:
output = output.split('\n')[1]
output = output.split('\n')[1].strip()
try:
return int(output)
except ValueError:
@@ -96,13 +98,16 @@ class PosixSpecificTestCase(unittest.TestCase):
name_psutil = psutil.Process(self.pid).name.lower()
self.assertEqual(name_ps, name_psutil)
@skipIf(OSX or BSD)
@unittest.skipIf(OSX or BSD,
'ps -o start not available')
def test_process_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
time_psutil = psutil.Process(self.pid).create_time
time_psutil = datetime.datetime.fromtimestamp(
if SUNOS:
time_psutil = round(time_psutil)
time_psutil_tstamp = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
self.assertEqual(time_ps, time_psutil)
self.assertEqual(time_ps, time_psutil_tstamp)
def test_process_exe(self):
ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
@@ -122,21 +127,28 @@ class PosixSpecificTestCase(unittest.TestCase):
def test_process_cmdline(self):
ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
if SUNOS:
# ps on Solaris only shows the first part of the cmdline
psutil_cmdline = psutil_cmdline.split(" ")[0]
self.assertEqual(ps_cmdline, psutil_cmdline)
@retry_before_failing()
def test_get_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE)
if SUNOS:
cmd = ["ps", "ax"]
else:
cmd = ["ps", "ax", "-o", "pid"]
p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
output = output.replace('PID', '')
p.wait()
pids_ps = []
for pid in output.split('\n'):
if pid:
pids_ps.append(int(pid.strip()))
for line in output.split('\n')[1:]:
if line:
pid = int(line.split()[0].strip())
pids_ps.append(pid)
# remove ps subprocess pid which is supposed to be dead in meantime
pids_ps.remove(p.pid)
pids_psutil = psutil.get_pid_list()
@@ -152,12 +164,15 @@ class PosixSpecificTestCase(unittest.TestCase):
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))
# for some reason ifconfig -a does not report differente interfaces
# psutil does
@unittest.skipIf(SUNOS, "test not reliable on SUNOS")
def test_nic_names(self):
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
for nic in psutil.network_io_counters(pernic=True).keys():
for nic in psutil.net_io_counters(pernic=True).keys():
for line in output.split():
if line.startswith(nic):
break
@@ -174,8 +189,50 @@ class PosixSpecificTestCase(unittest.TestCase):
self.assertTrue(u.name in users, u.name)
self.assertTrue(u.terminal in terminals, u.terminal)
def test_fds_open(self):
# Note: this fails from time to time; I'm keen on thinking
# it doesn't mean something is broken
def call(p, attr):
attr = getattr(p, name, None)
if attr is not None and callable(attr):
ret = attr()
else:
ret = attr
if __name__ == '__main__':
p = psutil.Process(os.getpid())
attrs = []
failures = []
for name in dir(psutil.Process):
if name.startswith('_') \
or name.startswith('set_') \
or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
'send_signal', 'wait', 'get_children', 'as_dict'):
continue
else:
try:
num1 = p.get_num_fds()
for x in range(2):
call(p, name)
num2 = p.get_num_fds()
except psutil.AccessDenied:
pass
else:
if abs(num2 - num1) > 1:
fail = "failure while processing Process.%s method " \
"(before=%s, after=%s)" % (name, num1, num2)
failures.append(fail)
if failures:
self.fail('\n' + '\n'.join(failures))
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
unittest.TextTestRunner(verbosity=2).run(test_suite)
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)

View File

@@ -0,0 +1,44 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Sun OS specific tests. These are implicitly run by test_psutil.py."""
import psutil
from test_psutil import *
class SunOSSpecificTestCase(unittest.TestCase):
def test_swap_memory(self):
out = sh('swap -l -k')
lines = out.strip().split('\n')[1:]
if not lines:
raise ValueError('no swap device(s) configured')
total = free = 0
for line in lines:
line = line.split()
t, f = line[-2:]
t = t.replace('K', '')
f = f.replace('K', '')
total += int(int(t) * 1024)
free += int(int(f) * 1024)
used = total - free
psutil_swap = psutil.swap_memory()
self.assertEqual(psutil_swap.total, total)
self.assertEqual(psutil_swap.used, used)
self.assertEqual(psutil_swap.free, free)
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -11,8 +11,6 @@ import unittest
import platform
import signal
import time
import warnings
import atexit
import sys
import subprocess
import errno
@@ -21,23 +19,23 @@ import traceback
import psutil
import _psutil_mswindows
from psutil._compat import PY3, callable, long
from test_psutil import reap_children, get_test_subprocess, wait_for_pid, warn
from test_psutil import *
try:
import wmi
except ImportError:
err = sys.exc_info()[1]
atexit.register(warn, "Couldn't run wmi tests: %s" % str(err))
register_warning("Couldn't run wmi tests: %s" % str(err))
wmi = None
try:
import win32api
import win32con
except ImportError:
err = sys.exc_info()[1]
atexit.register(warn, "Couldn't run pywin32 tests: %s" % str(err))
register_warning("Couldn't run pywin32 tests: %s" % str(err))
win32api = None
class WindowsSpecificTestCase(unittest.TestCase):
def setUp(self):
@@ -78,7 +76,7 @@ class WindowsSpecificTestCase(unittest.TestCase):
out = p.communicate()[0]
if PY3:
out = str(out, sys.stdout.encoding)
nics = psutil.network_io_counters(pernic=True).keys()
nics = psutil.net_io_counters(pernic=True).keys()
for nic in nics:
if "pseudo-interface" in nic.replace(' ', '-').lower():
continue
@@ -149,6 +147,8 @@ class WindowsSpecificTestCase(unittest.TestCase):
# --- psutil namespace functions and constants tests
@unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
'NUMBER_OF_PROCESSORS env var is not available')
def test_NUM_CPUS(self):
num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
self.assertEqual(num_cpus, psutil.NUM_CPUS)
@@ -294,9 +294,9 @@ class TestDualProcessImplementation(unittest.TestCase):
def assert_ge_0(obj):
if isinstance(obj, tuple):
for value in obj:
assert value >= 0, value
self.assertGreaterEqual(value, 0)
elif isinstance(obj, (int, long, float)):
assert obj >= 0, obj
self.assertGreaterEqual(obj, 0)
else:
assert 0 # case not handled which needs to be fixed
@@ -306,17 +306,19 @@ class TestDualProcessImplementation(unittest.TestCase):
else:
if isinstance(ret2, (int, long, float)):
diff = abs(ret1 - ret2)
assert diff <= tolerance, diff
self.assertLessEqual(diff, tolerance)
elif isinstance(ret2, tuple):
for a, b in zip(ret1, ret2):
diff = abs(a - b)
assert diff <= tolerance, diff
self.assertLessEqual(diff, tolerance)
failures = []
for name, tolerance in self.fun_names:
meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
for p in psutil.process_iter():
if name == 'get_process_memory_info' and p.pid == os.getpid():
continue
#
try:
ret1 = meth1(p.pid)
@@ -359,8 +361,13 @@ class TestDualProcessImplementation(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
if __name__ == '__main__':
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
unittest.TextTestRunner(verbosity=2).run(test_suite)
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -17,13 +17,12 @@ import time
import socket
import threading
import types
import sys
import psutil
import psutil._common
from psutil._compat import PY3, callable, xrange
from test_psutil import POSIX, LINUX, WINDOWS, OSX, BSD, TESTFN
from test_psutil import (reap_children, skipUnless, skipIf, supports_ipv6,
safe_remove, get_test_subprocess)
from test_psutil import *
# disable cache for Process class properties
psutil._common.cached_property.enabled = False
@@ -93,7 +92,9 @@ class TestProcessObjectLeaks(Base):
for attr in [x for x in dir(self) if x.startswith('test')]:
if attr[5:] not in supported_attrs:
meth = getattr(self, attr)
@skipIf(True)
name = meth.__func__.__name__.replace('test_', '')
@unittest.skipIf(True,
"%s not supported on this platform" % name)
def test_(self):
pass
setattr(self, attr, types.MethodType(test_, self))
@@ -183,7 +184,7 @@ class TestProcessObjectLeaks(Base):
def test_terminal(self):
self.execute('terminal')
@skipUnless(WINDOWS)
@unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
def test_resume(self):
self.execute('resume')
@@ -206,12 +207,12 @@ class TestProcessObjectLeaks(Base):
f.close()
# OSX implementation is unbelievably slow
@skipIf(OSX)
@unittest.skipIf(OSX, "OSX implementation is too slow")
def test_get_memory_maps(self):
self.execute('get_memory_maps')
# Linux implementation is pure python so since it's slow we skip it
@skipIf(LINUX)
@unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)")
def test_get_connections(self):
def create_socket(family, type):
sock = socket.socket(family, type)
@@ -232,8 +233,14 @@ class TestProcessObjectLeaks(Base):
s.bind(TESTFN)
s.listen(1)
socks.append(s)
kind = 'all'
# TODO: UNIX sockets are temporarily implemented by parsing
# 'pfiles' cmd output; we don't want that part of the code to
# be executed.
if SUNOS:
kind = 'inet'
try:
self.execute('get_connections', kind='all')
self.execute('get_connections', kind=kind)
finally:
for s in socks:
s.close()
@@ -279,13 +286,16 @@ class TestModuleFunctionsLeaks(Base):
if callable(obj):
retvalue = obj(*args, **kwargs)
@skipIf(POSIX)
@unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
def test_pid_exists(self):
self.execute('pid_exists', os.getpid())
def test_virtual_memory(self):
self.execute('virtual_memory')
# TODO: remove this skip when this gets fixed
@unittest.skipIf(SUNOS,
"not worth being tested on SUNOS (uses a subprocess)")
def test_swap_memory(self):
self.execute('swap_memory')
@@ -295,21 +305,22 @@ class TestModuleFunctionsLeaks(Base):
def test_per_cpu_times(self):
self.execute('cpu_times', percpu=True)
@skipUnless(WINDOWS)
@unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
def test_disk_usage(self):
self.execute('disk_usage', '.')
def test_disk_partitions(self):
self.execute('disk_partitions')
def test_network_io_counters(self):
self.execute('network_io_counters')
def test_net_io_counters(self):
self.execute('net_io_counters')
def test_disk_io_counters(self):
self.execute('disk_io_counters')
# XXX - on Windows this produces a false positive
@skipIf(WINDOWS)
@unittest.skipIf(WINDOWS,
"XXX produces a false positive on Windows")
def test_get_users(self):
self.execute('get_users')
@@ -321,7 +332,9 @@ def test_main():
TestModuleFunctionsLeaks,]
for test in tests:
test_suite.addTest(unittest.makeSuite(test))
unittest.TextTestRunner(verbosity=2).run(test_suite)
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
test_main()
if not test_main():
sys.exit(1)

File diff suppressed because it is too large Load Diff