Bug 1661935 - Introduce Subprocess.connectRunning() and ManagedProcess r=robwu
Differential Revision: https://phabricator.services.mozilla.com/D229496
This commit is contained in:
@@ -188,6 +188,19 @@ export var Subprocess = {
|
||||
let path = lazy.SubprocessImpl.pathSearch(command, environment);
|
||||
return Promise.resolve(path);
|
||||
},
|
||||
|
||||
/**
|
||||
* Connect to an already-running subprocess
|
||||
* given the file descriptors for its stdin, stdout and stderr.
|
||||
*
|
||||
* @param {number[]} fds
|
||||
* A list of three file descriptors [stdin, stdout, stderr].
|
||||
*
|
||||
* @returns {Promise<Process>}
|
||||
*/
|
||||
connectRunning(fds) {
|
||||
return lazy.SubprocessImpl.connectRunning(fds);
|
||||
},
|
||||
};
|
||||
|
||||
Object.assign(Subprocess, SubprocessConstants);
|
||||
|
||||
@@ -635,6 +635,16 @@ export class BaseProcess {
|
||||
});
|
||||
}
|
||||
|
||||
static fromRunning(options) {
|
||||
let worker = this.getWorker();
|
||||
|
||||
return worker
|
||||
.call("connectRunning", [options])
|
||||
.then(({ processId, fds }) => {
|
||||
return new this(worker, processId, fds, null);
|
||||
});
|
||||
}
|
||||
|
||||
static get WORKER_URL() {
|
||||
throw new Error("Not implemented");
|
||||
}
|
||||
|
||||
@@ -54,6 +54,8 @@ var libc = new Library("libc", LIBC_CHOICES, {
|
||||
|
||||
close: [ctypes.default_abi, ctypes.int, ctypes.int /* fildes */],
|
||||
|
||||
dup: [ctypes.default_abi, ctypes.int, ctypes.int],
|
||||
|
||||
fcntl: [
|
||||
ctypes.default_abi,
|
||||
ctypes.int,
|
||||
|
||||
@@ -198,6 +198,10 @@ var SubprocessUnix = {
|
||||
error.errorCode = SubprocessConstants.ERROR_BAD_EXECUTABLE;
|
||||
throw error;
|
||||
},
|
||||
|
||||
connectRunning(options) {
|
||||
return Process.fromRunning(options);
|
||||
},
|
||||
};
|
||||
|
||||
export var SubprocessImpl = SubprocessUnix;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
"use strict";
|
||||
|
||||
/* exported Process */
|
||||
/* exported Process, ManagedProcess */
|
||||
|
||||
/* import-globals-from subprocess_shared.js */
|
||||
/* import-globals-from subprocess_shared_unix.js */
|
||||
@@ -464,6 +464,96 @@ class Process extends BaseProcess {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Wrapping file descriptors of already-running process to allow interacting
|
||||
* with the process via the Subprocess module.
|
||||
*
|
||||
* This is used e.g. by NativeMessaging code when interfacing with XDG
|
||||
* WebExtensions portal (required when running under Flatpak or Snap). The
|
||||
* actual portal binary is executed outside and file descriptors will be shared
|
||||
* over DBus so interaction can happen with the portal.
|
||||
*
|
||||
* The file descriptors are wrapped in a unix.Fd() to ensure proper cleanup as
|
||||
* long as the ManagedProcess instance is properly disposed off: kill() should
|
||||
* ensure this is the case.
|
||||
*
|
||||
* All the file descriptors will be monitored by poll() to try and detect
|
||||
* process termination.
|
||||
* */
|
||||
class ManagedProcess extends BaseProcess {
|
||||
/*
|
||||
* Connect to an already running process that was spawned externally,
|
||||
* through numeric stdin/stdout/stderr file descriptors.
|
||||
*
|
||||
* @param {number[]} receivedFDs
|
||||
* An array of file descriptors (stdin, stdout and stderr).
|
||||
*/
|
||||
connectRunning(receivedFDs) {
|
||||
const fdCheck = fds => {
|
||||
for (let value of io.pipes.values()) {
|
||||
const fd = parseInt(value.fd.toString(), 10);
|
||||
return fd === fds[0] || fd === fds[1] || fd === fds[2];
|
||||
}
|
||||
};
|
||||
|
||||
const alreadyUsed = fdCheck(receivedFDs);
|
||||
if (alreadyUsed) {
|
||||
throw new Error("Attempt to connect FDs already handled by Subprocess");
|
||||
}
|
||||
|
||||
this.pipes.push(new OutputPipe(this, unix.Fd(receivedFDs[0])));
|
||||
this.pipes.push(new InputPipe(this, unix.Fd(receivedFDs[1])));
|
||||
this.pipes.push(new InputPipe(this, unix.Fd(receivedFDs[2])));
|
||||
}
|
||||
|
||||
get pollEvents() {
|
||||
// No poll fd here: we don't have a handle to the process, only its fd.
|
||||
// Each fd of this.pipes is already polled by updatePollFds, and their
|
||||
// Pipe's onError() method calls our wait() method when the fd is closed.
|
||||
// We assume that the process has exited when all pipes have closed.
|
||||
|
||||
// ManagedProcess does not have onReady() or onError() definitions because
|
||||
// updatePollFds() does not call these when pollEvents is 0.
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Termination of the ManagedProcess: such a process is started/stopped
|
||||
* outside of the browser, the termination here ensures that all its pipes are
|
||||
* properly closed and that any code waiting for the process to terminate is
|
||||
* unblocked by resolving the Promise.
|
||||
* */
|
||||
kill() {
|
||||
this.pipes.forEach(p => p.close());
|
||||
this.resolveExit(this.exitCode);
|
||||
}
|
||||
|
||||
/* A ManagedProcess being ran outside of our PID namespace (Snap/Flatpak)
|
||||
* there is no realistic way to waitpid() here.
|
||||
* As noted in pollEvents, we consider the process closed if all of its fd
|
||||
* have closed.
|
||||
* */
|
||||
wait() {
|
||||
if (this.pipes.every(pipe => pipe.closed)) {
|
||||
// Actual exitCode is unknown, just return null.
|
||||
this.resolveExit(null);
|
||||
} else {
|
||||
io.updatePollFds();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* A ManagedProcess is already running, so here the spawn just performs the
|
||||
* connection of the file descriptors received.
|
||||
*
|
||||
* @param {array} options
|
||||
* An array of file descriptors from an existing process.
|
||||
* */
|
||||
spawn(options) {
|
||||
return this.connectRunning(options);
|
||||
}
|
||||
}
|
||||
|
||||
io = {
|
||||
pollFds: null,
|
||||
pollHandlers: null,
|
||||
|
||||
@@ -168,6 +168,12 @@ var SubprocessWin = {
|
||||
error.errorCode = SubprocessConstants.ERROR_BAD_EXECUTABLE;
|
||||
throw error;
|
||||
},
|
||||
|
||||
connectRunning(_options) {
|
||||
// Not relevant (yet?) on Windows. This is currently used only on Unix
|
||||
// for native messaging through the WebExtensions portal.
|
||||
throw new Error("Not implemented");
|
||||
},
|
||||
};
|
||||
|
||||
export var SubprocessImpl = SubprocessWin;
|
||||
|
||||
@@ -601,6 +601,12 @@ class Process extends BaseProcess {
|
||||
libc.CloseHandle(procInfo.hThread);
|
||||
}
|
||||
|
||||
connectRunning(_options) {
|
||||
// Not relevant (yet?) on Windows. This is currently used only on Unix
|
||||
// for native messaging through the WebExtensions portal.
|
||||
throw new Error("Not implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when our process handle is signaled as active, meaning the process
|
||||
* has exited.
|
||||
|
||||
@@ -106,6 +106,15 @@ let requests = {
|
||||
return { data: { processId, fds, pid: process.pid } };
|
||||
},
|
||||
|
||||
connectRunning(options) {
|
||||
let process = new ManagedProcess(options);
|
||||
let processId = process.id;
|
||||
|
||||
io.addProcess(process);
|
||||
|
||||
return { data: { processId, fds: process.pipes.map(pipe => pipe.id) } };
|
||||
},
|
||||
|
||||
kill(processId, force = false) {
|
||||
let process = io.getProcess(processId);
|
||||
|
||||
@@ -162,6 +171,18 @@ let requests = {
|
||||
Array.from(io.processes.values(), proc => proc.awaitFinished())
|
||||
);
|
||||
},
|
||||
|
||||
// It is the caller's responsability to make sure dup() is called on the FDs
|
||||
// returned here.
|
||||
getFds(processId) {
|
||||
// fd is a unix.Fd aka CDataFinalizer that wraps the actual integer. We can
|
||||
// retrieve its value via .toString(), if it has not been closed yet.
|
||||
let process = io.getProcess(processId);
|
||||
let pipes = process.pipes.map(p => parseInt(p.fd.toString(), 10));
|
||||
return {
|
||||
data: [pipes[0], pipes[1], pipes[2]],
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
onmessage = event => {
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
/* eslint-disable mozilla/no-arbitrary-setTimeout */
|
||||
"use strict";
|
||||
|
||||
add_task(async function test_subprocess_connectRunning() {
|
||||
if (AppConstants.platform === "win") {
|
||||
Assert.throws(
|
||||
() => Subprocess.connectRunning([42, 58, 63]),
|
||||
/Not implemented/
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let tempFile = Services.dirsvc.get("TmpD", Ci.nsIFile);
|
||||
tempFile.append("test-subprocess-connectRunning.txt");
|
||||
if (tempFile.exists()) {
|
||||
tempFile.remove(true);
|
||||
}
|
||||
registerCleanupFunction(async function () {
|
||||
tempFile.remove(true);
|
||||
});
|
||||
|
||||
let running = await Subprocess.call({
|
||||
command: await Subprocess.pathSearch("tee"),
|
||||
arguments: [tempFile.path],
|
||||
environment: {},
|
||||
stderr: "pipe",
|
||||
});
|
||||
let { getSubprocessImplForTest } = ChromeUtils.importESModule(
|
||||
"resource://gre/modules/Subprocess.sys.mjs"
|
||||
);
|
||||
let worker = getSubprocessImplForTest().Process.getWorker();
|
||||
let fds = await worker.call("getFds", [running.id]);
|
||||
|
||||
let wrongConnect = Subprocess.connectRunning(fds);
|
||||
await Assert.rejects(
|
||||
wrongConnect,
|
||||
function (error) {
|
||||
return /Attempt to connect FDs already handled by Subprocess/.test(
|
||||
error.message
|
||||
);
|
||||
},
|
||||
"Cannot reuse existing FDs in connectRunning"
|
||||
);
|
||||
|
||||
// The test needs to dup() the FDs because of how "tee" is launched above:
|
||||
// when Subprocess.call() launched "tee" there will be Pipe created that
|
||||
// obviously refers to OS level FDs. When connectRunning() will be executed
|
||||
// then there will also be Pipe object created referencing the FDs. This
|
||||
// leads to a situation where 'running.{stdin,stdout}' and
|
||||
// 'proc.{stdin,stdout}' are sharing the same OS level FDs. Thus dup() is
|
||||
// required to make sure operations on those are done correctly. Especially
|
||||
// a Pipe on the JS side wraps the FD in a unix.Fd() which ensures a close()
|
||||
// is done so close() twice on the same FD is wrong.
|
||||
let { libc } = ChromeUtils.importESModule(
|
||||
"resource://gre/modules/subprocess/subprocess_unix.sys.mjs"
|
||||
);
|
||||
// unix.Fd() here should ensure there's a close() done.
|
||||
const duped_fds = fds.map(e => libc.dup(e));
|
||||
|
||||
let proc = await Subprocess.connectRunning(duped_fds);
|
||||
equal(proc.pid, null, "Already running process pid is null");
|
||||
|
||||
let contents = "lorem ipsum";
|
||||
let writeOp = proc.stdin.write(contents);
|
||||
equal(
|
||||
(await writeOp).bytesWritten,
|
||||
contents.length,
|
||||
"Contents correctly written to stdin"
|
||||
);
|
||||
let readOp = running.stdout.readString(contents.length);
|
||||
equal(await readOp, contents, "Pipes communication is functional");
|
||||
await running.kill();
|
||||
ok(tempFile.exists(), "temp file was written to");
|
||||
equal(
|
||||
await IOUtils.readUTF8(tempFile.path),
|
||||
contents,
|
||||
"Contents correctly written to temp file"
|
||||
);
|
||||
await proc.kill();
|
||||
});
|
||||
|
||||
add_task(async function test_cleaned_up() {
|
||||
let { getSubprocessImplForTest } = ChromeUtils.importESModule(
|
||||
"resource://gre/modules/Subprocess.sys.mjs"
|
||||
);
|
||||
|
||||
let worker = getSubprocessImplForTest().Process.getWorker();
|
||||
|
||||
let openFiles = await worker.call("getOpenFiles", []);
|
||||
let processes = await worker.call("getProcesses", []);
|
||||
|
||||
equal(openFiles.size, 0, "No remaining open files");
|
||||
equal(processes.size, 0, "No remaining processes");
|
||||
});
|
||||
@@ -15,6 +15,8 @@ skip-if = [
|
||||
]
|
||||
run-sequentially = "very high failure rate in parallel"
|
||||
|
||||
["test_subprocess_connectRunning.js"]
|
||||
|
||||
["test_subprocess_getEnvironment.js"]
|
||||
|
||||
["test_subprocess_pathSearch.js"]
|
||||
|
||||
Reference in New Issue
Block a user