241 lines
8.9 KiB
Python
241 lines
8.9 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
# flake8: noqa: F821
|
|
|
|
import fcntl
|
|
import os
|
|
import select
|
|
import time
|
|
from subprocess import PIPE, Popen
|
|
|
|
|
|
class TaskPool:
|
|
# Run a series of subprocesses. Try to keep up to a certain number going in
|
|
# parallel at any given time. Enforce time limits.
|
|
#
|
|
# This is implemented using non-blocking I/O, and so is Unix-specific.
|
|
#
|
|
# We assume that, if a task closes its standard error, then it's safe to
|
|
# wait for it to terminate. So an ill-behaved task that closes its standard
|
|
# output and then hangs will hang us, as well. However, as it takes special
|
|
# effort to close one's standard output, this seems unlikely to be a
|
|
# problem in practice.
|
|
|
|
# A task we should run in a subprocess. Users should subclass this and
|
|
# fill in the methods as given.
|
|
class Task:
|
|
def __init__(self):
|
|
self.pipe = None
|
|
self.start_time = None
|
|
|
|
# Record that this task is running, with |pipe| as its Popen object,
|
|
# and should time out at |deadline|.
|
|
def start(self, pipe, deadline):
|
|
self.pipe = pipe
|
|
self.deadline = deadline
|
|
|
|
# Return a shell command (a string or sequence of arguments) to be
|
|
# passed to Popen to run the task. The command will be given
|
|
# /dev/null as its standard input, and pipes as its standard output
|
|
# and error.
|
|
def cmd(self):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process wrote
|
|
# |string| to its standard output.
|
|
def onStdout(self, string):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process wrote
|
|
# |string| to its standard error.
|
|
def onStderr(self, string):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process terminated,
|
|
# yielding |returncode|.
|
|
def onFinished(self, returncode):
|
|
raise NotImplementedError
|
|
|
|
# TaskPool calls this method to report that the process timed out and
|
|
# was killed.
|
|
def onTimeout(self):
|
|
raise NotImplementedError
|
|
|
|
# If a task output handler (onStdout, onStderr) throws this, we terminate
|
|
# the task.
|
|
class TerminateTask(Exception):
|
|
pass
|
|
|
|
def __init__(self, tasks, cwd=".", job_limit=4, timeout=150):
|
|
self.pending = iter(tasks)
|
|
self.cwd = cwd
|
|
self.job_limit = job_limit
|
|
self.timeout = timeout
|
|
self.next_pending = next(self.pending, None)
|
|
|
|
def run_all(self):
|
|
# The currently running tasks: a set of Task instances.
|
|
running = set()
|
|
with open(os.devnull) as devnull:
|
|
while True:
|
|
while len(running) < self.job_limit and self.next_pending:
|
|
task = self.next_pending
|
|
p = Popen(
|
|
task.cmd(),
|
|
bufsize=16384,
|
|
stdin=devnull,
|
|
stdout=PIPE,
|
|
stderr=PIPE,
|
|
cwd=self.cwd,
|
|
)
|
|
|
|
# Put the stdout and stderr pipes in non-blocking mode. See
|
|
# the post-'select' code below for details.
|
|
flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
|
|
fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
|
|
fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
|
|
task.start(p, time.time() + self.timeout)
|
|
running.add(task)
|
|
self.next_pending = next(self.pending, None)
|
|
|
|
# If we have no tasks running, and the above wasn't able to
|
|
# start any new ones, then we must be done!
|
|
if not running:
|
|
break
|
|
|
|
# How many seconds do we have until the earliest deadline?
|
|
now = time.time()
|
|
secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
|
|
|
|
# Wait for output or a timeout.
|
|
stdouts_and_stderrs = [t.pipe.stdout for t in running] + [
|
|
t.pipe.stderr for t in running
|
|
]
|
|
(readable, w, x) = select.select(
|
|
stdouts_and_stderrs, [], [], secs_to_next_deadline
|
|
)
|
|
finished = set()
|
|
terminate = set()
|
|
for t in running:
|
|
# Since we've placed the pipes in non-blocking mode, these
|
|
# 'read's will simply return as many bytes as are available,
|
|
# rather than blocking until they have accumulated the full
|
|
# amount requested (or reached EOF). The 'read's should
|
|
# never throw, since 'select' has told us there was
|
|
# something available.
|
|
if t.pipe.stdout in readable:
|
|
output = t.pipe.stdout.read(16384)
|
|
if len(output):
|
|
try:
|
|
t.onStdout(output.decode("utf-8"))
|
|
except TerminateTask:
|
|
terminate.add(t)
|
|
if t.pipe.stderr in readable:
|
|
output = t.pipe.stderr.read(16384)
|
|
if len(output):
|
|
try:
|
|
t.onStderr(output.decode("utf-8"))
|
|
except TerminateTask:
|
|
terminate.add(t)
|
|
else:
|
|
# We assume that, once a task has closed its stderr,
|
|
# it will soon terminate. If a task closes its
|
|
# stderr and then hangs, we'll hang too, here.
|
|
t.pipe.wait()
|
|
t.onFinished(t.pipe.returncode)
|
|
finished.add(t)
|
|
# Remove the finished tasks from the running set. (Do this here
|
|
# to avoid mutating the set while iterating over it.)
|
|
running -= finished
|
|
|
|
# Terminate any tasks whose handlers have asked us to do so.
|
|
for t in terminate:
|
|
t.pipe.terminate()
|
|
t.pipe.wait()
|
|
running.remove(t)
|
|
|
|
# Terminate any tasks which have missed their deadline.
|
|
finished = set()
|
|
for t in running:
|
|
if now >= t.deadline:
|
|
t.pipe.terminate()
|
|
t.pipe.wait()
|
|
t.onTimeout()
|
|
finished.add(t)
|
|
# Remove the finished tasks from the running set. (Do this here
|
|
# to avoid mutating the set while iterating over it.)
|
|
running -= finished
|
|
return None
|
|
|
|
|
|
def get_cpu_count():
|
|
"""
|
|
Guess at a reasonable parallelism count to set as the default for the
|
|
current machine and run.
|
|
"""
|
|
# Python 2.6+
|
|
try:
|
|
import multiprocessing
|
|
|
|
return multiprocessing.cpu_count()
|
|
except (ImportError, NotImplementedError):
|
|
pass
|
|
|
|
# POSIX
|
|
try:
|
|
res = int(os.sysconf("SC_NPROCESSORS_ONLN"))
|
|
if res > 0:
|
|
return res
|
|
except (AttributeError, ValueError):
|
|
pass
|
|
|
|
# Windows
|
|
try:
|
|
res = int(os.environ["NUMBER_OF_PROCESSORS"])
|
|
if res > 0:
|
|
return res
|
|
except (KeyError, ValueError):
|
|
pass
|
|
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
|
|
def sleep_sort(ns, timeout):
|
|
sorted = []
|
|
|
|
class SortableTask(TaskPool.Task):
|
|
def __init__(self, n):
|
|
super(SortableTask, self).__init__()
|
|
self.n = n
|
|
|
|
def start(self, pipe, deadline):
|
|
super(SortableTask, self).start(pipe, deadline)
|
|
|
|
def cmd(self):
|
|
return ["sh", "-c", "echo out; sleep %d; echo err>&2" % (self.n,)]
|
|
|
|
def onStdout(self, text):
|
|
print("%d stdout: %r" % (self.n, text))
|
|
|
|
def onStderr(self, text):
|
|
print("%d stderr: %r" % (self.n, text))
|
|
|
|
def onFinished(self, returncode):
|
|
print("%d (rc=%d)" % (self.n, returncode))
|
|
sorted.append(self.n)
|
|
|
|
def onTimeout(self):
|
|
print("%d timed out" % (self.n,))
|
|
|
|
p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
|
|
p.run_all()
|
|
return sorted
|
|
|
|
print(repr(sleep_sort([1, 1, 2, 3, 5, 8, 13, 21, 34], 15)))
|