Backed out changeset 697eb6db7d96 (bug 930808) for OS X make check failures
This commit is contained in:
@@ -1,15 +0,0 @@
|
||||
- The recommended way to run tests (also on Windows) is to cd into parent
|
||||
directory and run:
|
||||
|
||||
make test
|
||||
|
||||
- If you're on Python < 2.7 unittest2 module must be installed first:
|
||||
https://pypi.python.org/pypi/unittest2
|
||||
|
||||
- The main test script is test_psutil.py, which also imports platform-specific
|
||||
_*.py scripts (which should be ignored).
|
||||
|
||||
- test_memory_leaks.py looks for memory leaks into C extension modules and must
|
||||
be run separately with:
|
||||
|
||||
make memtest
|
||||
@@ -4,27 +4,23 @@
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
# TODO: add test for comparing connections with 'sockstat' cmd
|
||||
|
||||
"""BSD specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import unittest
|
||||
import subprocess
|
||||
import time
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
|
||||
import psutil
|
||||
|
||||
from psutil._compat import PY3
|
||||
from test_psutil import (TOLERANCE, sh, get_test_subprocess, which,
|
||||
retry_before_failing, reap_children, unittest)
|
||||
from test_psutil import *
|
||||
|
||||
|
||||
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
|
||||
if os.getuid() == 0: # muse requires root privileges
|
||||
MUSE_AVAILABLE = which('muse')
|
||||
else:
|
||||
MUSE_AVAILABLE = False
|
||||
MUSE_AVAILABLE = which('muse')
|
||||
|
||||
|
||||
def sysctl(cmdline):
|
||||
@@ -38,10 +34,9 @@ def sysctl(cmdline):
|
||||
except ValueError:
|
||||
return result
|
||||
|
||||
|
||||
def muse(field):
|
||||
"""Thin wrapper around 'muse' cmdline utility."""
|
||||
out = sh('muse')
|
||||
out = sh('muse', stderr=DEVNULL)
|
||||
for line in out.split('\n'):
|
||||
if line.startswith(field):
|
||||
break
|
||||
@@ -52,29 +47,27 @@ def muse(field):
|
||||
|
||||
class BSDSpecificTestCase(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.pid = get_test_subprocess().pid
|
||||
def setUp(self):
|
||||
self.pid = get_test_subprocess().pid
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
def tearDown(self):
|
||||
reap_children()
|
||||
|
||||
def test_boot_time(self):
|
||||
def test_BOOT_TIME(self):
|
||||
s = sysctl('sysctl kern.boottime')
|
||||
s = s[s.find(" sec = ") + 7:]
|
||||
s = s[:s.find(',')]
|
||||
btime = int(s)
|
||||
self.assertEqual(btime, psutil.boot_time())
|
||||
self.assertEqual(btime, psutil.BOOT_TIME)
|
||||
|
||||
def test_process_create_time(self):
|
||||
cmdline = "ps -o lstart -p %s" % self.pid
|
||||
cmdline = "ps -o lstart -p %s" %self.pid
|
||||
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
|
||||
output = p.communicate()[0]
|
||||
if PY3:
|
||||
output = str(output, sys.stdout.encoding)
|
||||
start_ps = output.replace('STARTED', '').strip()
|
||||
start_psutil = psutil.Process(self.pid).create_time()
|
||||
start_psutil = psutil.Process(self.pid).create_time
|
||||
start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
|
||||
time.localtime(start_psutil))
|
||||
self.assertEqual(start_ps, start_psutil)
|
||||
@@ -108,7 +101,7 @@ class BSDSpecificTestCase(unittest.TestCase):
|
||||
|
||||
def test_memory_maps(self):
|
||||
out = sh('procstat -v %s' % self.pid)
|
||||
maps = psutil.Process(self.pid).memory_maps(grouped=False)
|
||||
maps = psutil.Process(self.pid).get_memory_maps(grouped=False)
|
||||
lines = out.split('\n')[1:]
|
||||
while lines:
|
||||
line = lines.pop()
|
||||
|
||||
@@ -7,23 +7,22 @@
|
||||
"""Linux specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
from __future__ import division
|
||||
import os
|
||||
import re
|
||||
import unittest
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
|
||||
from test_psutil import POSIX, TOLERANCE, TRAVIS
|
||||
from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess,
|
||||
retry_before_failing, get_kernel_version, unittest)
|
||||
|
||||
from test_psutil import *
|
||||
from psutil._compat import PY3
|
||||
import psutil
|
||||
|
||||
|
||||
class LinuxSpecificTestCase(unittest.TestCase):
|
||||
|
||||
@unittest.skipIf(
|
||||
POSIX and not hasattr(os, 'statvfs'),
|
||||
reason="os.statvfs() function not available on this platform")
|
||||
@unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
|
||||
reason="os.statvfs() function not available on this platform")
|
||||
@skip_on_not_implemented()
|
||||
def test_disks(self):
|
||||
# test psutil.disk_usage() and psutil.disk_partitions()
|
||||
@@ -54,11 +53,9 @@ class LinuxSpecificTestCase(unittest.TestCase):
|
||||
sproc = get_test_subprocess()
|
||||
time.sleep(1)
|
||||
p = psutil.Process(sproc.pid)
|
||||
maps = p.memory_maps(grouped=False)
|
||||
maps = p.get_memory_maps(grouped=False)
|
||||
pmap = sh('pmap -x %s' % p.pid).split('\n')
|
||||
# get rid of header
|
||||
del pmap[0]
|
||||
del pmap[0]
|
||||
del pmap[0]; del pmap[0] # get rid of header
|
||||
while maps and pmap:
|
||||
this = maps.pop(0)
|
||||
other = pmap.pop(0)
|
||||
@@ -121,11 +118,13 @@ class LinuxSpecificTestCase(unittest.TestCase):
|
||||
self.assertAlmostEqual(free, psutil.swap_memory().free,
|
||||
delta=TOLERANCE)
|
||||
|
||||
@unittest.skipIf(TRAVIS, "unknown failure on travis")
|
||||
def test_cpu_times(self):
|
||||
fields = psutil.cpu_times()._fields
|
||||
kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
|
||||
kernel_ver = re.findall('\d.\d.\d', os.uname()[2])[0]
|
||||
kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
|
||||
# steal >= 2.6.11
|
||||
# guest >= 2.6.24
|
||||
# guest_nice >= 3.2.0
|
||||
if kernel_ver_info >= (2, 6, 11):
|
||||
self.assertIn('steal', fields)
|
||||
else:
|
||||
@@ -139,41 +138,6 @@ class LinuxSpecificTestCase(unittest.TestCase):
|
||||
else:
|
||||
self.assertNotIn('guest_nice', fields)
|
||||
|
||||
# --- tests for specific kernel versions
|
||||
|
||||
@unittest.skipUnless(
|
||||
get_kernel_version() >= (2, 6, 36),
|
||||
"prlimit() not available on this Linux kernel version")
|
||||
def test_prlimit_availability(self):
|
||||
# prlimit() should be available starting from kernel 2.6.36
|
||||
p = psutil.Process(os.getpid())
|
||||
p.rlimit(psutil.RLIMIT_NOFILE)
|
||||
# if prlimit() is supported *at least* these constants should
|
||||
# be available
|
||||
self.assertTrue(hasattr(psutil, "RLIM_INFINITY"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_AS"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_CORE"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_CPU"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_DATA"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_NPROC"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_RSS"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_STACK"))
|
||||
|
||||
@unittest.skipUnless(
|
||||
get_kernel_version() >= (3, 0),
|
||||
"prlimit constants not available on this Linux kernel version")
|
||||
def test_resource_consts_kernel_v(self):
|
||||
# more recent constants
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_NICE"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME"))
|
||||
self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING"))
|
||||
|
||||
|
||||
def test_main():
|
||||
test_suite = unittest.TestSuite()
|
||||
|
||||
@@ -6,17 +6,17 @@
|
||||
|
||||
"""OSX specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import unittest
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import psutil
|
||||
|
||||
from psutil._compat import PY3
|
||||
from test_psutil import (TOLERANCE, sh, get_test_subprocess, reap_children,
|
||||
retry_before_failing, unittest)
|
||||
from test_psutil import *
|
||||
|
||||
|
||||
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
|
||||
@@ -35,7 +35,6 @@ def sysctl(cmdline):
|
||||
except ValueError:
|
||||
return result
|
||||
|
||||
|
||||
def vm_stat(field):
|
||||
"""Wrapper around 'vm_stat' cmdline utility."""
|
||||
out = sh('vm_stat')
|
||||
@@ -49,22 +48,20 @@ def vm_stat(field):
|
||||
|
||||
class OSXSpecificTestCase(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.pid = get_test_subprocess().pid
|
||||
def setUp(self):
|
||||
self.pid = get_test_subprocess().pid
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
def tearDown(self):
|
||||
reap_children()
|
||||
|
||||
def test_process_create_time(self):
|
||||
cmdline = "ps -o lstart -p %s" % self.pid
|
||||
cmdline = "ps -o lstart -p %s" %self.pid
|
||||
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
|
||||
output = p.communicate()[0]
|
||||
if PY3:
|
||||
output = str(output, sys.stdout.encoding)
|
||||
start_ps = output.replace('STARTED', '').strip()
|
||||
start_psutil = psutil.Process(self.pid).create_time()
|
||||
start_psutil = psutil.Process(self.pid).create_time
|
||||
start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
|
||||
time.localtime(start_psutil))
|
||||
self.assertEqual(start_ps, start_psutil)
|
||||
@@ -100,7 +97,7 @@ class OSXSpecificTestCase(unittest.TestCase):
|
||||
|
||||
def test_vmem_total(self):
|
||||
sysctl_hwphymem = sysctl('sysctl hw.memsize')
|
||||
self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
|
||||
self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM)
|
||||
|
||||
@retry_before_failing()
|
||||
def test_vmem_free(self):
|
||||
|
||||
116
python/psutil/test/_posix.py
Normal file → Executable file
116
python/psutil/test/_posix.py
Normal file → Executable file
@@ -6,19 +6,17 @@
|
||||
|
||||
"""POSIX specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import unittest
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import datetime
|
||||
|
||||
import psutil
|
||||
|
||||
from psutil._compat import PY3
|
||||
from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON
|
||||
from test_psutil import (get_test_subprocess, skip_on_access_denied,
|
||||
retry_before_failing, reap_children, sh, unittest,
|
||||
get_kernel_version, wait_for_pid)
|
||||
from test_psutil import *
|
||||
|
||||
|
||||
def ps(cmd):
|
||||
@@ -45,82 +43,75 @@ def ps(cmd):
|
||||
class PosixSpecificTestCase(unittest.TestCase):
|
||||
"""Compare psutil results against 'ps' command line utility."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
|
||||
stdin=subprocess.PIPE).pid
|
||||
wait_for_pid(cls.pid)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
reap_children()
|
||||
|
||||
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
|
||||
|
||||
def setUp(self):
|
||||
self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
|
||||
stdin=subprocess.PIPE).pid
|
||||
|
||||
def tearDown(self):
|
||||
reap_children()
|
||||
|
||||
def test_process_parent_pid(self):
|
||||
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
|
||||
ppid_psutil = psutil.Process(self.pid).ppid()
|
||||
ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
|
||||
ppid_psutil = psutil.Process(self.pid).ppid
|
||||
self.assertEqual(ppid_ps, ppid_psutil)
|
||||
|
||||
def test_process_uid(self):
|
||||
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
|
||||
uid_psutil = psutil.Process(self.pid).uids().real
|
||||
uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
|
||||
uid_psutil = psutil.Process(self.pid).uids.real
|
||||
self.assertEqual(uid_ps, uid_psutil)
|
||||
|
||||
def test_process_gid(self):
|
||||
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
|
||||
gid_psutil = psutil.Process(self.pid).gids().real
|
||||
gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
|
||||
gid_psutil = psutil.Process(self.pid).gids.real
|
||||
self.assertEqual(gid_ps, gid_psutil)
|
||||
|
||||
def test_process_username(self):
|
||||
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
|
||||
username_psutil = psutil.Process(self.pid).username()
|
||||
username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
|
||||
username_psutil = psutil.Process(self.pid).username
|
||||
self.assertEqual(username_ps, username_psutil)
|
||||
|
||||
@skip_on_access_denied()
|
||||
@retry_before_failing()
|
||||
def test_process_rss_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
time.sleep(0.1)
|
||||
rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
|
||||
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
|
||||
rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
|
||||
rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
|
||||
self.assertEqual(rss_ps, rss_psutil)
|
||||
|
||||
@skip_on_access_denied()
|
||||
@retry_before_failing()
|
||||
def test_process_vsz_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
time.sleep(0.1)
|
||||
vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
|
||||
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
|
||||
vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
|
||||
vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
|
||||
self.assertEqual(vsz_ps, vsz_psutil)
|
||||
|
||||
def test_process_name(self):
|
||||
# use command + arg since "comm" keyword not supported on all platforms
|
||||
name_ps = ps("ps --no-headers -o command -p %s" % (
|
||||
self.pid)).split(' ')[0]
|
||||
name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
||||
# remove path if there is any, from the command
|
||||
name_ps = os.path.basename(name_ps).lower()
|
||||
name_psutil = psutil.Process(self.pid).name().lower()
|
||||
name_psutil = psutil.Process(self.pid).name.lower()
|
||||
self.assertEqual(name_ps, name_psutil)
|
||||
|
||||
@unittest.skipIf(OSX or BSD,
|
||||
'ps -o start not available')
|
||||
'ps -o start not available')
|
||||
def test_process_create_time(self):
|
||||
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
|
||||
time_psutil = psutil.Process(self.pid).create_time()
|
||||
time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
|
||||
time_psutil = psutil.Process(self.pid).create_time
|
||||
if SUNOS:
|
||||
time_psutil = round(time_psutil)
|
||||
time_psutil_tstamp = datetime.datetime.fromtimestamp(
|
||||
time_psutil).strftime("%H:%M:%S")
|
||||
time_psutil).strftime("%H:%M:%S")
|
||||
self.assertEqual(time_ps, time_psutil_tstamp)
|
||||
|
||||
def test_process_exe(self):
|
||||
ps_pathname = ps("ps --no-headers -o command -p %s" %
|
||||
self.pid).split(' ')[0]
|
||||
psutil_pathname = psutil.Process(self.pid).exe()
|
||||
ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
||||
psutil_pathname = psutil.Process(self.pid).exe
|
||||
try:
|
||||
self.assertEqual(ps_pathname, psutil_pathname)
|
||||
except AssertionError:
|
||||
@@ -134,15 +125,15 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
self.assertEqual(ps_pathname, adjusted_ps_pathname)
|
||||
|
||||
def test_process_cmdline(self):
|
||||
ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
|
||||
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
|
||||
ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
|
||||
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
|
||||
if SUNOS:
|
||||
# ps on Solaris only shows the first part of the cmdline
|
||||
psutil_cmdline = psutil_cmdline.split(" ")[0]
|
||||
self.assertEqual(ps_cmdline, psutil_cmdline)
|
||||
|
||||
@retry_before_failing()
|
||||
def test_pids(self):
|
||||
def test_get_pids(self):
|
||||
# Note: this test might fail if the OS is starting/killing
|
||||
# other processes in the meantime
|
||||
if SUNOS:
|
||||
@@ -160,7 +151,7 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
pids_ps.append(pid)
|
||||
# remove ps subprocess pid which is supposed to be dead in meantime
|
||||
pids_ps.remove(p.pid)
|
||||
pids_psutil = psutil.pids()
|
||||
pids_psutil = psutil.get_pid_list()
|
||||
pids_ps.sort()
|
||||
pids_psutil.sort()
|
||||
|
||||
@@ -188,14 +179,13 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
else:
|
||||
self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
|
||||
|
||||
@retry_before_failing()
|
||||
def test_users(self):
|
||||
def test_get_users(self):
|
||||
out = sh("who")
|
||||
lines = out.split('\n')
|
||||
users = [x.split()[0] for x in lines]
|
||||
self.assertEqual(len(users), len(psutil.users()))
|
||||
self.assertEqual(len(users), len(psutil.get_users()))
|
||||
terminals = [x.split()[1] for x in lines]
|
||||
for u in psutil.users():
|
||||
for u in psutil.get_users():
|
||||
self.assertTrue(u.name in users, u.name)
|
||||
self.assertTrue(u.terminal in terminals, u.terminal)
|
||||
|
||||
@@ -203,35 +193,27 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
# Note: this fails from time to time; I'm keen on thinking
|
||||
# it doesn't mean something is broken
|
||||
def call(p, attr):
|
||||
args = ()
|
||||
attr = getattr(p, name, None)
|
||||
if attr is not None and callable(attr):
|
||||
if name == 'rlimit':
|
||||
args = (psutil.RLIMIT_NOFILE,)
|
||||
elif name == 'set_rlimit':
|
||||
args = (psutil.RLIMIT_NOFILE, (5, 5))
|
||||
attr(*args)
|
||||
ret = attr()
|
||||
else:
|
||||
attr
|
||||
ret = attr
|
||||
|
||||
p = psutil.Process(os.getpid())
|
||||
attrs = []
|
||||
failures = []
|
||||
ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
|
||||
'send_signal', 'wait', 'children', 'as_dict']
|
||||
if LINUX and get_kernel_version() < (2, 6, 36):
|
||||
ignored_names.append('rlimit')
|
||||
for name in dir(psutil.Process):
|
||||
if (name.startswith('_')
|
||||
or name.startswith('set_')
|
||||
or name.startswith('get') # deprecated APIs
|
||||
or name in ignored_names):
|
||||
if name.startswith('_') \
|
||||
or name.startswith('set_') \
|
||||
or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
|
||||
'send_signal', 'wait', 'get_children', 'as_dict'):
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
num1 = p.num_fds()
|
||||
num1 = p.get_num_fds()
|
||||
for x in range(2):
|
||||
call(p, name)
|
||||
num2 = p.num_fds()
|
||||
num2 = p.get_num_fds()
|
||||
except psutil.AccessDenied:
|
||||
pass
|
||||
else:
|
||||
@@ -243,6 +225,8 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
self.fail('\n' + '\n'.join(failures))
|
||||
|
||||
|
||||
|
||||
|
||||
def test_main():
|
||||
test_suite = unittest.TestSuite()
|
||||
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
|
||||
|
||||
@@ -6,10 +6,8 @@
|
||||
|
||||
"""Sun OS specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import sys
|
||||
|
||||
from test_psutil import sh, unittest
|
||||
import psutil
|
||||
from test_psutil import *
|
||||
|
||||
|
||||
class SunOSSpecificTestCase(unittest.TestCase):
|
||||
|
||||
@@ -6,55 +6,44 @@
|
||||
|
||||
"""Windows specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import errno
|
||||
import os
|
||||
import unittest
|
||||
import platform
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import sys
|
||||
import subprocess
|
||||
import errno
|
||||
import traceback
|
||||
|
||||
from test_psutil import (get_test_subprocess, reap_children, unittest)
|
||||
import psutil
|
||||
import _psutil_mswindows
|
||||
from psutil._compat import PY3, callable, long
|
||||
from test_psutil import *
|
||||
|
||||
try:
|
||||
import wmi
|
||||
except ImportError:
|
||||
err = sys.exc_info()[1]
|
||||
register_warning("Couldn't run wmi tests: %s" % str(err))
|
||||
wmi = None
|
||||
try:
|
||||
import win32api
|
||||
import win32con
|
||||
except ImportError:
|
||||
win32api = win32con = None
|
||||
|
||||
from psutil._compat import PY3, callable, long
|
||||
from psutil._pswindows import ACCESS_DENIED_SET
|
||||
import _psutil_windows
|
||||
import psutil
|
||||
|
||||
|
||||
def wrap_exceptions(fun):
|
||||
def wrapper(self, *args, **kwargs):
|
||||
try:
|
||||
return fun(self, *args, **kwargs)
|
||||
except OSError:
|
||||
err = sys.exc_info()[1]
|
||||
if err.errno in ACCESS_DENIED_SET:
|
||||
raise psutil.AccessDenied(None, None)
|
||||
if err.errno == errno.ESRCH:
|
||||
raise psutil.NoSuchProcess(None, None)
|
||||
raise
|
||||
return wrapper
|
||||
err = sys.exc_info()[1]
|
||||
register_warning("Couldn't run pywin32 tests: %s" % str(err))
|
||||
win32api = None
|
||||
|
||||
|
||||
class WindowsSpecificTestCase(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.pid = get_test_subprocess().pid
|
||||
def setUp(self):
|
||||
sproc = get_test_subprocess()
|
||||
wait_for_pid(sproc.pid)
|
||||
self.pid = sproc.pid
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
def tearDown(self):
|
||||
reap_children()
|
||||
|
||||
def test_issue_24(self):
|
||||
@@ -63,14 +52,14 @@ class WindowsSpecificTestCase(unittest.TestCase):
|
||||
|
||||
def test_special_pid(self):
|
||||
p = psutil.Process(4)
|
||||
self.assertEqual(p.name(), 'System')
|
||||
self.assertEqual(p.name, 'System')
|
||||
# use __str__ to access all common Process properties to check
|
||||
# that nothing strange happens
|
||||
str(p)
|
||||
p.username()
|
||||
self.assertTrue(p.create_time() >= 0.0)
|
||||
p.username
|
||||
self.assertTrue(p.create_time >= 0.0)
|
||||
try:
|
||||
rss, vms = p.memory_info()
|
||||
rss, vms = p.get_memory_info()
|
||||
except psutil.AccessDenied:
|
||||
# expected on Windows Vista and Windows 7
|
||||
if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
|
||||
@@ -78,7 +67,7 @@ class WindowsSpecificTestCase(unittest.TestCase):
|
||||
else:
|
||||
self.assertTrue(rss > 0)
|
||||
|
||||
def test_send_signal(self):
|
||||
def test_signal(self):
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
|
||||
|
||||
@@ -92,205 +81,206 @@ class WindowsSpecificTestCase(unittest.TestCase):
|
||||
if "pseudo-interface" in nic.replace(' ', '-').lower():
|
||||
continue
|
||||
if nic not in out:
|
||||
self.fail(
|
||||
"%r nic wasn't found in 'ipconfig /all' output" % nic)
|
||||
self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic)
|
||||
|
||||
def test_exe(self):
|
||||
for p in psutil.process_iter():
|
||||
try:
|
||||
self.assertEqual(os.path.basename(p.exe()), p.name())
|
||||
self.assertEqual(os.path.basename(p.exe), p.name)
|
||||
except psutil.Error:
|
||||
pass
|
||||
|
||||
# --- Process class tests
|
||||
if wmi is not None:
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_name(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertEqual(p.name(), w.Caption)
|
||||
# --- Process class tests
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_exe(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertEqual(p.exe(), w.ExecutablePath)
|
||||
def test_process_name(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertEqual(p.name, w.Caption)
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_cmdline(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertEqual(' '.join(p.cmdline()),
|
||||
w.CommandLine.replace('"', ''))
|
||||
def test_process_exe(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertEqual(p.exe, w.ExecutablePath)
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_username(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
domain, _, username = w.GetOwner()
|
||||
username = "%s\\%s" % (domain, username)
|
||||
self.assertEqual(p.username(), username)
|
||||
def test_process_cmdline(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', ''))
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_rss_memory(self):
|
||||
time.sleep(0.1)
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
rss = p.memory_info().rss
|
||||
self.assertEqual(rss, int(w.WorkingSetSize))
|
||||
def test_process_username(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
domain, _, username = w.GetOwner()
|
||||
username = "%s\\%s" %(domain, username)
|
||||
self.assertEqual(p.username, username)
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_vms_memory(self):
|
||||
time.sleep(0.1)
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
vms = p.memory_info().vms
|
||||
# http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
|
||||
# ...claims that PageFileUsage is represented in Kilo
|
||||
# bytes but funnily enough on certain platforms bytes are
|
||||
# returned instead.
|
||||
wmi_usage = int(w.PageFileUsage)
|
||||
if (vms != wmi_usage) and (vms != wmi_usage * 1024):
|
||||
self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
|
||||
def test_process_rss_memory(self):
|
||||
time.sleep(0.1)
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
rss = p.get_memory_info().rss
|
||||
self.assertEqual(rss, int(w.WorkingSetSize))
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_process_create_time(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
wmic_create = str(w.CreationDate.split('.')[0])
|
||||
psutil_create = time.strftime("%Y%m%d%H%M%S",
|
||||
time.localtime(p.create_time()))
|
||||
self.assertEqual(wmic_create, psutil_create)
|
||||
def test_process_vms_memory(self):
|
||||
time.sleep(0.1)
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
vms = p.get_memory_info().vms
|
||||
# http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
|
||||
# ...claims that PageFileUsage is represented in Kilo
|
||||
# bytes but funnily enough on certain platforms bytes are
|
||||
# returned instead.
|
||||
wmi_usage = int(w.PageFileUsage)
|
||||
if (vms != wmi_usage) and (vms != wmi_usage * 1024):
|
||||
self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
|
||||
|
||||
# --- psutil namespace functions and constants tests
|
||||
def test_process_create_time(self):
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(self.pid)
|
||||
wmic_create = str(w.CreationDate.split('.')[0])
|
||||
psutil_create = time.strftime("%Y%m%d%H%M%S",
|
||||
time.localtime(p.create_time))
|
||||
self.assertEqual(wmic_create, psutil_create)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
|
||||
'NUMBER_OF_PROCESSORS env var is not available')
|
||||
def test_cpu_count(self):
|
||||
num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
|
||||
self.assertEqual(num_cpus, psutil.cpu_count())
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_total_phymem(self):
|
||||
w = wmi.WMI().Win32_ComputerSystem()[0]
|
||||
self.assertEqual(int(w.TotalPhysicalMemory),
|
||||
psutil.virtual_memory().total)
|
||||
# --- psutil namespace functions and constants tests
|
||||
|
||||
# @unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
# def test__UPTIME(self):
|
||||
# # _UPTIME constant is not public but it is used internally
|
||||
# # as value to return for pid 0 creation time.
|
||||
# # WMI behaves the same.
|
||||
# w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
# p = psutil.Process(0)
|
||||
# wmic_create = str(w.CreationDate.split('.')[0])
|
||||
# psutil_create = time.strftime("%Y%m%d%H%M%S",
|
||||
# time.localtime(p.create_time()))
|
||||
#
|
||||
@unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
|
||||
'NUMBER_OF_PROCESSORS env var is not available')
|
||||
def test_NUM_CPUS(self):
|
||||
num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
|
||||
self.assertEqual(num_cpus, psutil.NUM_CPUS)
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_pids(self):
|
||||
# Note: this test might fail if the OS is starting/killing
|
||||
# other processes in the meantime
|
||||
w = wmi.WMI().Win32_Process()
|
||||
wmi_pids = [x.ProcessId for x in w]
|
||||
wmi_pids.sort()
|
||||
psutil_pids = psutil.pids()
|
||||
psutil_pids.sort()
|
||||
if wmi_pids != psutil_pids:
|
||||
difference = \
|
||||
filter(lambda x: x not in wmi_pids, psutil_pids) + \
|
||||
filter(lambda x: x not in psutil_pids, wmi_pids)
|
||||
self.fail("difference: " + str(difference))
|
||||
def test_TOTAL_PHYMEM(self):
|
||||
w = wmi.WMI().Win32_ComputerSystem()[0]
|
||||
self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM)
|
||||
|
||||
@unittest.skipIf(wmi is None, "wmi module is not installed")
|
||||
def test_disks(self):
|
||||
ps_parts = psutil.disk_partitions(all=True)
|
||||
wmi_parts = wmi.WMI().Win32_LogicalDisk()
|
||||
for ps_part in ps_parts:
|
||||
for wmi_part in wmi_parts:
|
||||
if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
|
||||
if not ps_part.mountpoint:
|
||||
# this is usually a CD-ROM with no disk inserted
|
||||
break
|
||||
try:
|
||||
usage = psutil.disk_usage(ps_part.mountpoint)
|
||||
except OSError:
|
||||
err = sys.exc_info()[1]
|
||||
if err.errno == errno.ENOENT:
|
||||
# usually this is the floppy
|
||||
def test__UPTIME(self):
|
||||
# _UPTIME constant is not public but it is used internally
|
||||
# as value to return for pid 0 creation time.
|
||||
# WMI behaves the same.
|
||||
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
|
||||
p = psutil.Process(0)
|
||||
wmic_create = str(w.CreationDate.split('.')[0])
|
||||
psutil_create = time.strftime("%Y%m%d%H%M%S",
|
||||
time.localtime(p.create_time))
|
||||
# XXX - ? no actual test here
|
||||
|
||||
def test_get_pids(self):
|
||||
# Note: this test might fail if the OS is starting/killing
|
||||
# other processes in the meantime
|
||||
w = wmi.WMI().Win32_Process()
|
||||
wmi_pids = [x.ProcessId for x in w]
|
||||
wmi_pids.sort()
|
||||
psutil_pids = psutil.get_pid_list()
|
||||
psutil_pids.sort()
|
||||
if wmi_pids != psutil_pids:
|
||||
difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \
|
||||
filter(lambda x:x not in psutil_pids, wmi_pids)
|
||||
self.fail("difference: " + str(difference))
|
||||
|
||||
def test_disks(self):
|
||||
ps_parts = psutil.disk_partitions(all=True)
|
||||
wmi_parts = wmi.WMI().Win32_LogicalDisk()
|
||||
for ps_part in ps_parts:
|
||||
for wmi_part in wmi_parts:
|
||||
if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
|
||||
if not ps_part.mountpoint:
|
||||
# this is usually a CD-ROM with no disk inserted
|
||||
break
|
||||
else:
|
||||
raise
|
||||
self.assertEqual(usage.total, int(wmi_part.Size))
|
||||
wmi_free = int(wmi_part.FreeSpace)
|
||||
self.assertEqual(usage.free, wmi_free)
|
||||
# 10 MB tollerance
|
||||
if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
|
||||
self.fail("psutil=%s, wmi=%s" % (
|
||||
usage.free, wmi_free))
|
||||
break
|
||||
else:
|
||||
self.fail("can't find partition %s" % repr(ps_part))
|
||||
|
||||
@unittest.skipIf(win32api is None, "pywin32 module is not installed")
|
||||
def test_num_handles(self):
|
||||
p = psutil.Process(os.getpid())
|
||||
before = p.num_handles()
|
||||
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
|
||||
win32con.FALSE, os.getpid())
|
||||
after = p.num_handles()
|
||||
self.assertEqual(after, before + 1)
|
||||
win32api.CloseHandle(handle)
|
||||
self.assertEqual(p.num_handles(), before)
|
||||
|
||||
@unittest.skipIf(win32api is None, "pywin32 module is not installed")
|
||||
def test_num_handles_2(self):
|
||||
# Note: this fails from time to time; I'm keen on thinking
|
||||
# it doesn't mean something is broken
|
||||
def call(p, attr):
|
||||
attr = getattr(p, name, None)
|
||||
if attr is not None and callable(attr):
|
||||
attr()
|
||||
else:
|
||||
attr
|
||||
|
||||
p = psutil.Process(self.pid)
|
||||
failures = []
|
||||
for name in dir(psutil.Process):
|
||||
if name.startswith('_') \
|
||||
or name.startswith('set_') \
|
||||
or name.startswith('get') \
|
||||
or name in ('terminate', 'kill', 'suspend', 'resume',
|
||||
'nice', 'send_signal', 'wait', 'children',
|
||||
'as_dict'):
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
call(p, name)
|
||||
num1 = p.num_handles()
|
||||
call(p, name)
|
||||
num2 = p.num_handles()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
try:
|
||||
usage = psutil.disk_usage(ps_part.mountpoint)
|
||||
except OSError:
|
||||
err = sys.exc_info()[1]
|
||||
if err.errno == errno.ENOENT:
|
||||
# usually this is the floppy
|
||||
break
|
||||
else:
|
||||
raise
|
||||
self.assertEqual(usage.total, int(wmi_part.Size))
|
||||
wmi_free = int(wmi_part.FreeSpace)
|
||||
self.assertEqual(usage.free, wmi_free)
|
||||
# 10 MB tollerance
|
||||
if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
|
||||
self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free)
|
||||
break
|
||||
else:
|
||||
if num2 > num1:
|
||||
fail = \
|
||||
"failure while processing Process.%s method " \
|
||||
"(before=%s, after=%s)" % (name, num1, num2)
|
||||
failures.append(fail)
|
||||
if failures:
|
||||
self.fail('\n' + '\n'.join(failures))
|
||||
self.fail("can't find partition %s" % repr(ps_part))
|
||||
|
||||
if win32api is not None:
|
||||
|
||||
def test_get_num_handles(self):
|
||||
p = psutil.Process(os.getpid())
|
||||
before = p.get_num_handles()
|
||||
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
|
||||
win32con.FALSE, os.getpid())
|
||||
after = p.get_num_handles()
|
||||
self.assertEqual(after, before+1)
|
||||
win32api.CloseHandle(handle)
|
||||
self.assertEqual(p.get_num_handles(), before)
|
||||
|
||||
def test_get_num_handles_2(self):
|
||||
# Note: this fails from time to time; I'm keen on thinking
|
||||
# it doesn't mean something is broken
|
||||
def call(p, attr):
|
||||
attr = getattr(p, name, None)
|
||||
if attr is not None and callable(attr):
|
||||
ret = attr()
|
||||
else:
|
||||
ret = attr
|
||||
|
||||
p = psutil.Process(self.pid)
|
||||
attrs = []
|
||||
failures = []
|
||||
for name in dir(psutil.Process):
|
||||
if name.startswith('_') \
|
||||
or name.startswith('set_') \
|
||||
or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
|
||||
'send_signal', 'wait', 'get_children', 'as_dict'):
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
call(p, name)
|
||||
num1 = p.get_num_handles()
|
||||
call(p, name)
|
||||
num2 = p.get_num_handles()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
else:
|
||||
if num2 > num1:
|
||||
fail = "failure while processing Process.%s method " \
|
||||
"(before=%s, after=%s)" % (name, num1, num2)
|
||||
failures.append(fail)
|
||||
if failures:
|
||||
self.fail('\n' + '\n'.join(failures))
|
||||
|
||||
|
||||
import _psutil_mswindows
|
||||
from psutil._psmswindows import ACCESS_DENIED_SET
|
||||
|
||||
def wrap_exceptions(callable):
|
||||
def wrapper(self, *args, **kwargs):
|
||||
try:
|
||||
return callable(self, *args, **kwargs)
|
||||
except OSError:
|
||||
err = sys.exc_info()[1]
|
||||
if err.errno in ACCESS_DENIED_SET:
|
||||
raise psutil.AccessDenied(None, None)
|
||||
if err.errno == errno.ESRCH:
|
||||
raise psutil.NoSuchProcess(None, None)
|
||||
raise
|
||||
return wrapper
|
||||
|
||||
class TestDualProcessImplementation(unittest.TestCase):
|
||||
fun_names = [
|
||||
# function name, tolerance
|
||||
('proc_cpu_times', 0.2),
|
||||
('proc_create_time', 0.5),
|
||||
('proc_num_handles', 1), # 1 because impl #1 opens a handle
|
||||
('proc_io_counters', 0),
|
||||
('proc_memory_info', 1024), # KB
|
||||
# function name tolerance
|
||||
('get_process_cpu_times', 0.2),
|
||||
('get_process_create_time', 0.5),
|
||||
('get_process_num_handles', 1), # 1 because impl #1 opens a handle
|
||||
('get_process_io_counters', 0),
|
||||
('get_process_memory_info', 1024), # KB
|
||||
]
|
||||
|
||||
def test_compare_values(self):
|
||||
@@ -300,7 +290,7 @@ class TestDualProcessImplementation(unittest.TestCase):
|
||||
# case the first fails because of limited permission error.
|
||||
# Here we test that the two methods return the exact same value,
|
||||
# see:
|
||||
# https://github.com/giampaolo/psutil/issues/304
|
||||
# http://code.google.com/p/psutil/issues/detail?id=304
|
||||
def assert_ge_0(obj):
|
||||
if isinstance(obj, tuple):
|
||||
for value in obj:
|
||||
@@ -324,10 +314,10 @@ class TestDualProcessImplementation(unittest.TestCase):
|
||||
|
||||
failures = []
|
||||
for name, tolerance in self.fun_names:
|
||||
meth1 = wrap_exceptions(getattr(_psutil_windows, name))
|
||||
meth2 = wrap_exceptions(getattr(_psutil_windows, name + '_2'))
|
||||
meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
|
||||
meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
|
||||
for p in psutil.process_iter():
|
||||
if name == 'proc_memory_info' and p.pid == os.getpid():
|
||||
if name == 'get_process_memory_info' and p.pid == os.getpid():
|
||||
continue
|
||||
#
|
||||
try:
|
||||
@@ -353,9 +343,10 @@ class TestDualProcessImplementation(unittest.TestCase):
|
||||
assert_ge_0(ret1)
|
||||
assert_ge_0(ret2)
|
||||
except AssertionError:
|
||||
err = sys.exc_info()[1]
|
||||
trace = traceback.format_exc()
|
||||
msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % (
|
||||
trace, p.pid, name, ret1, ret2)
|
||||
msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
|
||||
% (trace, p.pid, name, ret1, ret2)
|
||||
failures.append(msg)
|
||||
break
|
||||
if failures:
|
||||
@@ -364,9 +355,9 @@ class TestDualProcessImplementation(unittest.TestCase):
|
||||
def test_zombies(self):
|
||||
# test that NPS is raised by the 2nd implementation in case a
|
||||
# process no longer exists
|
||||
ZOMBIE_PID = max(psutil.pids()) + 5000
|
||||
ZOMBIE_PID = max(psutil.get_pid_list()) + 5000
|
||||
for name, _ in self.fun_names:
|
||||
meth = wrap_exceptions(getattr(_psutil_windows, name))
|
||||
meth = wrap_exceptions(getattr(_psutil_mswindows, name))
|
||||
self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
|
||||
|
||||
|
||||
|
||||
@@ -10,39 +10,29 @@ functions many times and compare process memory usage before and
|
||||
after the calls. It might produce false positives.
|
||||
"""
|
||||
|
||||
import gc
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import gc
|
||||
import unittest
|
||||
import time
|
||||
|
||||
if sys.version_info < (2, 7):
|
||||
import unittest2 as unittest # https://pypi.python.org/pypi/unittest2
|
||||
else:
|
||||
import unittest
|
||||
import socket
|
||||
import threading
|
||||
import types
|
||||
import sys
|
||||
|
||||
import psutil
|
||||
import psutil._common
|
||||
from psutil._compat import PY3, callable, xrange
|
||||
from test_psutil import *
|
||||
|
||||
from psutil._compat import callable, xrange
|
||||
from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, TESTFN,
|
||||
RLIMIT_SUPPORT)
|
||||
from test_psutil import (reap_children, supports_ipv6, safe_remove,
|
||||
get_test_subprocess)
|
||||
|
||||
# disable cache for Process class properties
|
||||
psutil._common.cached_property.enabled = False
|
||||
|
||||
LOOPS = 1000
|
||||
TOLERANCE = 4096
|
||||
SKIP_PYTHON_IMPL = True
|
||||
|
||||
|
||||
def skip_if_linux():
|
||||
return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
|
||||
"not worth being tested on LINUX (pure python)")
|
||||
|
||||
|
||||
class Base(unittest.TestCase):
|
||||
|
||||
proc = psutil.Process(os.getpid())
|
||||
|
||||
def execute(self, function, *args, **kwargs):
|
||||
@@ -82,11 +72,11 @@ class Base(unittest.TestCase):
|
||||
rss3 = self.get_mem()
|
||||
difference = rss3 - rss2
|
||||
if rss3 > rss2:
|
||||
self.fail("rss2=%s, rss3=%s, difference=%s"
|
||||
self.fail("rss2=%s, rss3=%s, difference=%s" \
|
||||
% (rss2, rss3, difference))
|
||||
|
||||
def get_mem(self):
|
||||
return psutil.Process(os.getpid()).memory_info()[0]
|
||||
return psutil.Process(os.getpid()).get_memory_info()[0]
|
||||
|
||||
def call(self, *args, **kwargs):
|
||||
raise NotImplementedError("must be implemented in subclass")
|
||||
@@ -95,6 +85,20 @@ class Base(unittest.TestCase):
|
||||
class TestProcessObjectLeaks(Base):
|
||||
"""Test leaks of Process class methods and properties"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Base.__init__(self, *args, **kwargs)
|
||||
# skip tests which are not supported by Process API
|
||||
supported_attrs = dir(psutil.Process)
|
||||
for attr in [x for x in dir(self) if x.startswith('test')]:
|
||||
if attr[5:] not in supported_attrs:
|
||||
meth = getattr(self, attr)
|
||||
name = meth.__func__.__name__.replace('test_', '')
|
||||
@unittest.skipIf(True,
|
||||
"%s not supported on this platform" % name)
|
||||
def test_(self):
|
||||
pass
|
||||
setattr(self, attr, types.MethodType(test_, self))
|
||||
|
||||
def setUp(self):
|
||||
gc.collect()
|
||||
|
||||
@@ -109,153 +113,107 @@ class TestProcessObjectLeaks(Base):
|
||||
except psutil.Error:
|
||||
pass
|
||||
|
||||
@skip_if_linux()
|
||||
def test_name(self):
|
||||
self.execute('name')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_cmdline(self):
|
||||
self.execute('cmdline')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_exe(self):
|
||||
self.execute('exe')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_ppid(self):
|
||||
self.execute('ppid')
|
||||
|
||||
@unittest.skipUnless(POSIX, "POSIX only")
|
||||
@skip_if_linux()
|
||||
def test_uids(self):
|
||||
self.execute('uids')
|
||||
|
||||
@unittest.skipUnless(POSIX, "POSIX only")
|
||||
@skip_if_linux()
|
||||
def test_gids(self):
|
||||
self.execute('gids')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_status(self):
|
||||
self.execute('status')
|
||||
|
||||
def test_nice_get(self):
|
||||
self.execute('nice')
|
||||
def test_get_nice(self):
|
||||
self.execute('get_nice')
|
||||
|
||||
def test_nice_set(self):
|
||||
niceness = psutil.Process(os.getpid()).nice()
|
||||
self.execute('nice', niceness)
|
||||
def test_set_nice(self):
|
||||
niceness = psutil.Process(os.getpid()).get_nice()
|
||||
self.execute('set_nice', niceness)
|
||||
|
||||
@unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
|
||||
"Linux and Windows Vista only")
|
||||
def test_ionice_get(self):
|
||||
self.execute('ionice')
|
||||
def test_get_io_counters(self):
|
||||
self.execute('get_io_counters')
|
||||
|
||||
@unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
|
||||
"Linux and Windows Vista only")
|
||||
def test_ionice_set(self):
|
||||
def test_get_ionice(self):
|
||||
self.execute('get_ionice')
|
||||
|
||||
def test_set_ionice(self):
|
||||
if WINDOWS:
|
||||
value = psutil.Process(os.getpid()).ionice()
|
||||
self.execute('ionice', value)
|
||||
value = psutil.Process(os.getpid()).get_ionice()
|
||||
self.execute('set_ionice', value)
|
||||
else:
|
||||
self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
|
||||
|
||||
@unittest.skipIf(OSX, "feature not supported on this platform")
|
||||
@skip_if_linux()
|
||||
def test_io_counters(self):
|
||||
self.execute('io_counters')
|
||||
self.execute('set_ionice', psutil.IOPRIO_CLASS_NONE)
|
||||
|
||||
def test_username(self):
|
||||
self.execute('username')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_create_time(self):
|
||||
self.execute('create_time')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_num_threads(self):
|
||||
self.execute('num_threads')
|
||||
def test_get_num_threads(self):
|
||||
self.execute('get_num_threads')
|
||||
|
||||
@unittest.skipUnless(WINDOWS, "Windows only")
|
||||
def test_num_handles(self):
|
||||
self.execute('num_handles')
|
||||
def test_get_num_handles(self):
|
||||
self.execute('get_num_handles')
|
||||
|
||||
@unittest.skipUnless(POSIX, "POSIX only")
|
||||
@skip_if_linux()
|
||||
def test_num_fds(self):
|
||||
self.execute('num_fds')
|
||||
def test_get_num_fds(self):
|
||||
self.execute('get_num_fds')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_threads(self):
|
||||
self.execute('threads')
|
||||
def test_get_threads(self):
|
||||
self.execute('get_threads')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_cpu_times(self):
|
||||
self.execute('cpu_times')
|
||||
def test_get_cpu_times(self):
|
||||
self.execute('get_cpu_times')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_memory_info(self):
|
||||
self.execute('memory_info')
|
||||
def test_get_memory_info(self):
|
||||
self.execute('get_memory_info')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_memory_info_ex(self):
|
||||
self.execute('memory_info_ex')
|
||||
def test_get_ext_memory_info(self):
|
||||
self.execute('get_ext_memory_info')
|
||||
|
||||
@unittest.skipUnless(POSIX, "POSIX only")
|
||||
@skip_if_linux()
|
||||
def test_terminal(self):
|
||||
self.execute('terminal')
|
||||
|
||||
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
|
||||
"not worth being tested on POSIX (pure python)")
|
||||
@unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
|
||||
def test_resume(self):
|
||||
self.execute('resume')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_cwd(self):
|
||||
self.execute('cwd')
|
||||
def test_getcwd(self):
|
||||
self.execute('getcwd')
|
||||
|
||||
@unittest.skipUnless(WINDOWS or LINUX, "Windows or Linux only")
|
||||
def test_cpu_affinity_get(self):
|
||||
self.execute('cpu_affinity')
|
||||
def test_get_cpu_affinity(self):
|
||||
self.execute('get_cpu_affinity')
|
||||
|
||||
@unittest.skipUnless(WINDOWS or LINUX, "Windows or Linux only")
|
||||
def test_cpu_affinity_set(self):
|
||||
affinity = psutil.Process(os.getpid()).cpu_affinity()
|
||||
self.execute('cpu_affinity', affinity)
|
||||
def test_set_cpu_affinity(self):
|
||||
affinity = psutil.Process(os.getpid()).get_cpu_affinity()
|
||||
self.execute('set_cpu_affinity', affinity)
|
||||
|
||||
@skip_if_linux()
|
||||
def test_open_files(self):
|
||||
def test_get_open_files(self):
|
||||
safe_remove(TESTFN) # needed after UNIX socket test has run
|
||||
f = open(TESTFN, 'w')
|
||||
try:
|
||||
self.execute('open_files')
|
||||
self.execute('get_open_files')
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
# OSX implementation is unbelievably slow
|
||||
@unittest.skipIf(OSX, "OSX implementation is too slow")
|
||||
@skip_if_linux()
|
||||
def test_memory_maps(self):
|
||||
self.execute('memory_maps')
|
||||
def test_get_memory_maps(self):
|
||||
self.execute('get_memory_maps')
|
||||
|
||||
@unittest.skipUnless(LINUX, "Linux only")
|
||||
@unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
|
||||
"only available on Linux >= 2.6.36")
|
||||
def test_rlimit_get(self):
|
||||
self.execute('rlimit', psutil.RLIMIT_NOFILE)
|
||||
|
||||
@unittest.skipUnless(LINUX, "Linux only")
|
||||
@unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
|
||||
"only available on Linux >= 2.6.36")
|
||||
def test_rlimit_set(self):
|
||||
limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
|
||||
self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
|
||||
|
||||
@skip_if_linux()
|
||||
# Windows implementation is based on a single system-wide function
|
||||
@unittest.skipIf(WINDOWS, "tested later")
|
||||
def test_connections(self):
|
||||
# Linux implementation is pure python so since it's slow we skip it
|
||||
@unittest.skipIf(LINUX, "not worth being tested on Linux (pure python)")
|
||||
def test_get_connections(self):
|
||||
def create_socket(family, type):
|
||||
sock = socket.socket(family, type)
|
||||
sock.bind(('', 0))
|
||||
@@ -282,7 +240,7 @@ class TestProcessObjectLeaks(Base):
|
||||
if SUNOS:
|
||||
kind = 'inet'
|
||||
try:
|
||||
self.execute('connections', kind=kind)
|
||||
self.execute('get_connections', kind=kind)
|
||||
finally:
|
||||
for s in socks:
|
||||
s.close()
|
||||
@@ -294,7 +252,6 @@ DEAD_PROC.kill()
|
||||
DEAD_PROC.wait()
|
||||
del p
|
||||
|
||||
|
||||
class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
|
||||
"""Same as above but looks for leaks occurring when dealing with
|
||||
zombie processes raising NoSuchProcess exception.
|
||||
@@ -327,24 +284,9 @@ class TestModuleFunctionsLeaks(Base):
|
||||
def call(self, function, *args, **kwargs):
|
||||
obj = getattr(psutil, function)
|
||||
if callable(obj):
|
||||
obj(*args, **kwargs)
|
||||
retvalue = obj(*args, **kwargs)
|
||||
|
||||
@skip_if_linux()
|
||||
def test_cpu_count_logical(self):
|
||||
psutil.cpu_count = psutil._psplatform.cpu_count_logical
|
||||
self.execute('cpu_count')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_cpu_count_physical(self):
|
||||
psutil.cpu_count = psutil._psplatform.cpu_count_physical
|
||||
self.execute('cpu_count')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_boot_time(self):
|
||||
self.execute('boot_time')
|
||||
|
||||
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
|
||||
"not worth being tested on POSIX (pure python)")
|
||||
@unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
|
||||
def test_pid_exists(self):
|
||||
self.execute('pid_exists', os.getpid())
|
||||
|
||||
@@ -357,48 +299,37 @@ class TestModuleFunctionsLeaks(Base):
|
||||
def test_swap_memory(self):
|
||||
self.execute('swap_memory')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_cpu_times(self):
|
||||
self.execute('cpu_times')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_per_cpu_times(self):
|
||||
self.execute('cpu_times', percpu=True)
|
||||
|
||||
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
|
||||
"not worth being tested on POSIX (pure python)")
|
||||
@unittest.skipIf(POSIX, "not worth being tested on POSIX (pure python)")
|
||||
def test_disk_usage(self):
|
||||
self.execute('disk_usage', '.')
|
||||
|
||||
def test_disk_partitions(self):
|
||||
self.execute('disk_partitions')
|
||||
|
||||
@skip_if_linux()
|
||||
def test_net_io_counters(self):
|
||||
self.execute('net_io_counters')
|
||||
|
||||
@unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
|
||||
'/proc/diskstats not available on this Linux version')
|
||||
@skip_if_linux()
|
||||
def test_disk_io_counters(self):
|
||||
self.execute('disk_io_counters')
|
||||
|
||||
# XXX - on Windows this produces a false positive
|
||||
@unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
|
||||
def test_users(self):
|
||||
self.execute('users')
|
||||
|
||||
@unittest.skipIf(LINUX,
|
||||
"not worth being tested on Linux (pure python)")
|
||||
def test_net_connections(self):
|
||||
self.execute('net_connections')
|
||||
@unittest.skipIf(WINDOWS,
|
||||
"XXX produces a false positive on Windows")
|
||||
def test_get_users(self):
|
||||
self.execute('get_users')
|
||||
|
||||
|
||||
def test_main():
|
||||
test_suite = unittest.TestSuite()
|
||||
tests = [TestProcessObjectLeaksZombie,
|
||||
TestProcessObjectLeaks,
|
||||
TestModuleFunctionsLeaks]
|
||||
TestModuleFunctionsLeaks,]
|
||||
for test in tests:
|
||||
test_suite.addTest(unittest.makeSuite(test))
|
||||
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user