Bug 930808 - Upgrade to psutil 2.1.3; r=glandium
psutil 2.1.3 is replacing psutil 1.0.1. There are numerous bug fixes and feature enhancements in psutil worth obtaining. Source code was obtained from https://pypi.python.org/packages/source/p/psutil/psutil-2.1.3.tar.gz and uncompressed into python/psutil without modification except for the removal of the egg-info directory and the .travis.yml file.
This commit is contained in:
116
python/psutil/test/_posix.py
Executable file → Normal file
116
python/psutil/test/_posix.py
Executable file → Normal file
@@ -6,17 +6,19 @@
|
||||
|
||||
"""POSIX specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import unittest
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import datetime
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import psutil
|
||||
|
||||
from psutil._compat import PY3
|
||||
from test_psutil import *
|
||||
from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON
|
||||
from test_psutil import (get_test_subprocess, skip_on_access_denied,
|
||||
retry_before_failing, reap_children, sh, unittest,
|
||||
get_kernel_version, wait_for_pid)
|
||||
|
||||
|
||||
def ps(cmd):
|
||||
@@ -43,75 +45,82 @@ def ps(cmd):
|
||||
class PosixSpecificTestCase(unittest.TestCase):
|
||||
"""Compare psutil results against 'ps' command line utility."""
|
||||
|
||||
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
|
||||
stdin=subprocess.PIPE).pid
|
||||
wait_for_pid(cls.pid)
|
||||
|
||||
def setUp(self):
|
||||
self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
|
||||
stdin=subprocess.PIPE).pid
|
||||
|
||||
def tearDown(self):
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
reap_children()
|
||||
|
||||
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
|
||||
|
||||
def test_process_parent_pid(self):
|
||||
ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
|
||||
ppid_psutil = psutil.Process(self.pid).ppid
|
||||
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
|
||||
ppid_psutil = psutil.Process(self.pid).ppid()
|
||||
self.assertEqual(ppid_ps, ppid_psutil)
|
||||
|
||||
def test_process_uid(self):
|
||||
uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
|
||||
uid_psutil = psutil.Process(self.pid).uids.real
|
||||
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
|
||||
uid_psutil = psutil.Process(self.pid).uids().real
|
||||
self.assertEqual(uid_ps, uid_psutil)
|
||||
|
||||
def test_process_gid(self):
|
||||
gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
|
||||
gid_psutil = psutil.Process(self.pid).gids.real
|
||||
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
|
||||
gid_psutil = psutil.Process(self.pid).gids().real
|
||||
self.assertEqual(gid_ps, gid_psutil)
|
||||
|
||||
def test_process_username(self):
|
||||
username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
|
||||
username_psutil = psutil.Process(self.pid).username
|
||||
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
|
||||
username_psutil = psutil.Process(self.pid).username()
|
||||
self.assertEqual(username_ps, username_psutil)
|
||||
|
||||
@skip_on_access_denied()
|
||||
@retry_before_failing()
|
||||
def test_process_rss_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
time.sleep(0.1)
|
||||
rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
|
||||
rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
|
||||
rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
|
||||
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
|
||||
self.assertEqual(rss_ps, rss_psutil)
|
||||
|
||||
@skip_on_access_denied()
|
||||
@retry_before_failing()
|
||||
def test_process_vsz_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
time.sleep(0.1)
|
||||
vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
|
||||
vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
|
||||
vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
|
||||
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
|
||||
self.assertEqual(vsz_ps, vsz_psutil)
|
||||
|
||||
def test_process_name(self):
|
||||
# use command + arg since "comm" keyword not supported on all platforms
|
||||
name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
||||
name_ps = ps("ps --no-headers -o command -p %s" % (
|
||||
self.pid)).split(' ')[0]
|
||||
# remove path if there is any, from the command
|
||||
name_ps = os.path.basename(name_ps).lower()
|
||||
name_psutil = psutil.Process(self.pid).name.lower()
|
||||
name_psutil = psutil.Process(self.pid).name().lower()
|
||||
self.assertEqual(name_ps, name_psutil)
|
||||
|
||||
@unittest.skipIf(OSX or BSD,
|
||||
'ps -o start not available')
|
||||
'ps -o start not available')
|
||||
def test_process_create_time(self):
|
||||
time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
|
||||
time_psutil = psutil.Process(self.pid).create_time
|
||||
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
|
||||
time_psutil = psutil.Process(self.pid).create_time()
|
||||
if SUNOS:
|
||||
time_psutil = round(time_psutil)
|
||||
time_psutil_tstamp = datetime.datetime.fromtimestamp(
|
||||
time_psutil).strftime("%H:%M:%S")
|
||||
time_psutil).strftime("%H:%M:%S")
|
||||
self.assertEqual(time_ps, time_psutil_tstamp)
|
||||
|
||||
def test_process_exe(self):
|
||||
ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
||||
psutil_pathname = psutil.Process(self.pid).exe
|
||||
ps_pathname = ps("ps --no-headers -o command -p %s" %
|
||||
self.pid).split(' ')[0]
|
||||
psutil_pathname = psutil.Process(self.pid).exe()
|
||||
try:
|
||||
self.assertEqual(ps_pathname, psutil_pathname)
|
||||
except AssertionError:
|
||||
@@ -125,15 +134,15 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
self.assertEqual(ps_pathname, adjusted_ps_pathname)
|
||||
|
||||
def test_process_cmdline(self):
|
||||
ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
|
||||
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
|
||||
ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
|
||||
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
|
||||
if SUNOS:
|
||||
# ps on Solaris only shows the first part of the cmdline
|
||||
psutil_cmdline = psutil_cmdline.split(" ")[0]
|
||||
self.assertEqual(ps_cmdline, psutil_cmdline)
|
||||
|
||||
@retry_before_failing()
|
||||
def test_get_pids(self):
|
||||
def test_pids(self):
|
||||
# Note: this test might fail if the OS is starting/killing
|
||||
# other processes in the meantime
|
||||
if SUNOS:
|
||||
@@ -151,7 +160,7 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
pids_ps.append(pid)
|
||||
# remove ps subprocess pid which is supposed to be dead in meantime
|
||||
pids_ps.remove(p.pid)
|
||||
pids_psutil = psutil.get_pid_list()
|
||||
pids_psutil = psutil.pids()
|
||||
pids_ps.sort()
|
||||
pids_psutil.sort()
|
||||
|
||||
@@ -179,13 +188,14 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
else:
|
||||
self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
|
||||
|
||||
def test_get_users(self):
|
||||
@retry_before_failing()
|
||||
def test_users(self):
|
||||
out = sh("who")
|
||||
lines = out.split('\n')
|
||||
users = [x.split()[0] for x in lines]
|
||||
self.assertEqual(len(users), len(psutil.get_users()))
|
||||
self.assertEqual(len(users), len(psutil.users()))
|
||||
terminals = [x.split()[1] for x in lines]
|
||||
for u in psutil.get_users():
|
||||
for u in psutil.users():
|
||||
self.assertTrue(u.name in users, u.name)
|
||||
self.assertTrue(u.terminal in terminals, u.terminal)
|
||||
|
||||
@@ -193,27 +203,35 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
# Note: this fails from time to time; I'm keen on thinking
|
||||
# it doesn't mean something is broken
|
||||
def call(p, attr):
|
||||
args = ()
|
||||
attr = getattr(p, name, None)
|
||||
if attr is not None and callable(attr):
|
||||
ret = attr()
|
||||
if name == 'rlimit':
|
||||
args = (psutil.RLIMIT_NOFILE,)
|
||||
elif name == 'set_rlimit':
|
||||
args = (psutil.RLIMIT_NOFILE, (5, 5))
|
||||
attr(*args)
|
||||
else:
|
||||
ret = attr
|
||||
attr
|
||||
|
||||
p = psutil.Process(os.getpid())
|
||||
attrs = []
|
||||
failures = []
|
||||
ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
|
||||
'send_signal', 'wait', 'children', 'as_dict']
|
||||
if LINUX and get_kernel_version() < (2, 6, 36):
|
||||
ignored_names.append('rlimit')
|
||||
for name in dir(psutil.Process):
|
||||
if name.startswith('_') \
|
||||
or name.startswith('set_') \
|
||||
or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
|
||||
'send_signal', 'wait', 'get_children', 'as_dict'):
|
||||
if (name.startswith('_')
|
||||
or name.startswith('set_')
|
||||
or name.startswith('get') # deprecated APIs
|
||||
or name in ignored_names):
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
num1 = p.get_num_fds()
|
||||
num1 = p.num_fds()
|
||||
for x in range(2):
|
||||
call(p, name)
|
||||
num2 = p.get_num_fds()
|
||||
num2 = p.num_fds()
|
||||
except psutil.AccessDenied:
|
||||
pass
|
||||
else:
|
||||
@@ -225,8 +243,6 @@ class PosixSpecificTestCase(unittest.TestCase):
|
||||
self.fail('\n' + '\n'.join(failures))
|
||||
|
||||
|
||||
|
||||
|
||||
def test_main():
|
||||
test_suite = unittest.TestSuite()
|
||||
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
|
||||
|
||||
Reference in New Issue
Block a user