Bug 783727 - Part 4: Add psutil Python package; r=glandium
Version 0.6.1 obtained from https://psutil.googlecode.com/files/psutil-0.6.1.tar.gz
This commit is contained in:
183
python/psutil/test/_posix.py
Executable file
183
python/psutil/test/_posix.py
Executable file
@@ -0,0 +1,183 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# $Id: _posix.py 1386 2012-06-27 15:44:36Z g.rodola $
|
||||
#
|
||||
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
"""POSIX specific tests. These are implicitly run by test_psutil.py."""
|
||||
|
||||
import unittest
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import datetime
|
||||
|
||||
import psutil
|
||||
|
||||
from psutil._compat import PY3
|
||||
from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX,
|
||||
BSD, ignore_access_denied, sh, skipIf)
|
||||
|
||||
|
||||
def ps(cmd):
|
||||
"""Expects a ps command with a -o argument and parse the result
|
||||
returning only the value of interest.
|
||||
"""
|
||||
if not LINUX:
|
||||
cmd = cmd.replace(" --no-headers ", " ")
|
||||
p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
|
||||
output = p.communicate()[0].strip()
|
||||
if PY3:
|
||||
output = str(output, sys.stdout.encoding)
|
||||
if not LINUX:
|
||||
output = output.split('\n')[1]
|
||||
try:
|
||||
return int(output)
|
||||
except ValueError:
|
||||
return output
|
||||
|
||||
|
||||
class PosixSpecificTestCase(unittest.TestCase):
|
||||
"""Compare psutil results against 'ps' command line utility."""
|
||||
|
||||
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
|
||||
|
||||
def setUp(self):
|
||||
self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
|
||||
stdin=subprocess.PIPE).pid
|
||||
|
||||
def tearDown(self):
|
||||
reap_children()
|
||||
|
||||
def test_process_parent_pid(self):
|
||||
ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
|
||||
ppid_psutil = psutil.Process(self.pid).ppid
|
||||
self.assertEqual(ppid_ps, ppid_psutil)
|
||||
|
||||
def test_process_uid(self):
|
||||
uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
|
||||
uid_psutil = psutil.Process(self.pid).uids.real
|
||||
self.assertEqual(uid_ps, uid_psutil)
|
||||
|
||||
def test_process_gid(self):
|
||||
gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
|
||||
gid_psutil = psutil.Process(self.pid).gids.real
|
||||
self.assertEqual(gid_ps, gid_psutil)
|
||||
|
||||
def test_process_username(self):
|
||||
username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
|
||||
username_psutil = psutil.Process(self.pid).username
|
||||
self.assertEqual(username_ps, username_psutil)
|
||||
|
||||
@ignore_access_denied
|
||||
def test_process_rss_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
time.sleep(0.1)
|
||||
rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
|
||||
rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
|
||||
self.assertEqual(rss_ps, rss_psutil)
|
||||
|
||||
@ignore_access_denied
|
||||
def test_process_vsz_memory(self):
|
||||
# give python interpreter some time to properly initialize
|
||||
# so that the results are the same
|
||||
time.sleep(0.1)
|
||||
vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
|
||||
vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
|
||||
self.assertEqual(vsz_ps, vsz_psutil)
|
||||
|
||||
def test_process_name(self):
|
||||
# use command + arg since "comm" keyword not supported on all platforms
|
||||
name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
||||
# remove path if there is any, from the command
|
||||
name_ps = os.path.basename(name_ps).lower()
|
||||
name_psutil = psutil.Process(self.pid).name.lower()
|
||||
self.assertEqual(name_ps, name_psutil)
|
||||
|
||||
@skipIf(OSX or BSD)
|
||||
def test_process_create_time(self):
|
||||
time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
|
||||
time_psutil = psutil.Process(self.pid).create_time
|
||||
time_psutil = datetime.datetime.fromtimestamp(
|
||||
time_psutil).strftime("%H:%M:%S")
|
||||
self.assertEqual(time_ps, time_psutil)
|
||||
|
||||
def test_process_exe(self):
|
||||
ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
||||
psutil_pathname = psutil.Process(self.pid).exe
|
||||
try:
|
||||
self.assertEqual(ps_pathname, psutil_pathname)
|
||||
except AssertionError:
|
||||
# certain platforms such as BSD are more accurate returning:
|
||||
# "/usr/local/bin/python2.7"
|
||||
# ...instead of:
|
||||
# "/usr/local/bin/python"
|
||||
# We do not want to consider this difference in accuracy
|
||||
# an error.
|
||||
adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
|
||||
self.assertEqual(ps_pathname, adjusted_ps_pathname)
|
||||
|
||||
def test_process_cmdline(self):
|
||||
ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
|
||||
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
|
||||
self.assertEqual(ps_cmdline, psutil_cmdline)
|
||||
|
||||
def test_get_pids(self):
|
||||
# Note: this test might fail if the OS is starting/killing
|
||||
# other processes in the meantime
|
||||
p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE)
|
||||
output = p.communicate()[0].strip()
|
||||
if PY3:
|
||||
output = str(output, sys.stdout.encoding)
|
||||
output = output.replace('PID', '')
|
||||
p.wait()
|
||||
pids_ps = []
|
||||
for pid in output.split('\n'):
|
||||
if pid:
|
||||
pids_ps.append(int(pid.strip()))
|
||||
# remove ps subprocess pid which is supposed to be dead in meantime
|
||||
pids_ps.remove(p.pid)
|
||||
pids_psutil = psutil.get_pid_list()
|
||||
pids_ps.sort()
|
||||
pids_psutil.sort()
|
||||
|
||||
# on OSX ps doesn't show pid 0
|
||||
if OSX and 0 not in pids_ps:
|
||||
pids_ps.insert(0, 0)
|
||||
|
||||
if pids_ps != pids_psutil:
|
||||
difference = [x for x in pids_psutil if x not in pids_ps] + \
|
||||
[x for x in pids_ps if x not in pids_psutil]
|
||||
self.fail("difference: " + str(difference))
|
||||
|
||||
def test_nic_names(self):
|
||||
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
|
||||
output = p.communicate()[0].strip()
|
||||
if PY3:
|
||||
output = str(output, sys.stdout.encoding)
|
||||
for nic in psutil.network_io_counters(pernic=True).keys():
|
||||
for line in output.split():
|
||||
if line.startswith(nic):
|
||||
break
|
||||
else:
|
||||
self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
|
||||
|
||||
def test_get_users(self):
|
||||
out = sh("who")
|
||||
lines = out.split('\n')
|
||||
users = [x.split()[0] for x in lines]
|
||||
self.assertEqual(len(users), len(psutil.get_users()))
|
||||
terminals = [x.split()[1] for x in lines]
|
||||
for u in psutil.get_users():
|
||||
self.assertTrue(u.name in users, u.name)
|
||||
self.assertTrue(u.terminal in terminals, u.terminal)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_suite = unittest.TestSuite()
|
||||
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
|
||||
unittest.TextTestRunner(verbosity=2).run(test_suite)
|
||||
Reference in New Issue
Block a user