Bug 1979546 - Replace WaitForMultipleObjects with IOCP in Subprocess a=RyanVM DONTBUILD

The current WaitForMultipleObjects-based implementation causes a
deadlock if 16 or more processes are spawned, as explained in the bug.

This patch replaces the use of events and WaitForMultipleObjects with
I/O Completion Ports (IOCP) as the mechanism to monitor I/O progress
and process terminations. This is not subject to the
MAXIMUM_WAIT_OBJECTS (64) limit of WaitForMultipleObjects.

Original Revision: https://phabricator.services.mozilla.com/D258968

Differential Revision: https://phabricator.services.mozilla.com/D266337
This commit is contained in:
Rob Wu
2025-09-26 14:46:13 +00:00
committed by rvandermeulen@mozilla.com
parent 827a745d82
commit 9ad1e61544
8 changed files with 726 additions and 121 deletions

View File

@@ -12,6 +12,8 @@ tags = "webextensions"
["test_ext_native_messaging.js"]
run-sequentially = "very high failure rate in parallel"
["test_ext_native_messaging_concurrent.js"]
["test_ext_native_messaging_perf.js"]
skip-if = ["tsan"] # Unreasonably slow, bug 1612707

View File

@@ -0,0 +1,107 @@
/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
/* vim: set sts=2 sw=2 et tw=80: */
"use strict";
AddonTestUtils.init(this);
const ECHO_BODY = String.raw`
import struct
import sys
stdin = getattr(sys.stdin, 'buffer', sys.stdin)
stdout = getattr(sys.stdout, 'buffer', sys.stdout)
while True:
rawlen = stdin.read(4)
if len(rawlen) == 0:
sys.exit(0)
msglen = struct.unpack('@I', rawlen)[0]
msg = stdin.read(msglen)
stdout.write(struct.pack('@I', msglen))
stdout.write(msg)
`;
const SCRIPTS = [
{
name: "echo",
description: "a native app that echoes back messages it receives",
script: ECHO_BODY.replace(/^ {2}/gm, ""),
},
];
function loadTestExtension({ background }) {
return ExtensionTestUtils.loadExtension({
background,
manifest: {
browser_specific_settings: { gecko: { id: ID } },
permissions: ["nativeMessaging"],
},
});
}
add_setup(async () => {
await ExtensionTestUtils.startAddonManager();
await setupHosts(SCRIPTS);
});
// Regression test for https://bugzilla.mozilla.org/show_bug.cgi?id=1979546
add_task(async function test_many_connectNative_calls() {
let extension = loadTestExtension({
async background() {
// Prior to bug 1979546 being fixed, the test got stuck on Windows at 16.
// Let's verify that things run smoothly even if we launch "many" more,
// e.g. 70 (arbitrarily chosen, higher than 64).
const NUMBER_OF_CONCURRENT_NATIVE_MESSAGING_APPS = 70;
const openPorts = [];
const remainingMsgs = new Set();
const firstMessagePromises = [];
for (let i = 0; i < NUMBER_OF_CONCURRENT_NATIVE_MESSAGING_APPS; ++i) {
const dummyMsg = `pingpong-${i}`;
const port = browser.runtime.connectNative("echo");
openPorts[i] = port;
remainingMsgs.add(i);
firstMessagePromises[i] = new Promise(resolve => {
port.onMessage.addListener(msg => {
browser.test.assertEq(dummyMsg, msg, `Echoed back message ${i}`);
remainingMsgs.delete(i);
browser.test.log(`Remaining: ${Array.from(remainingMsgs)}`);
resolve();
});
port.onDisconnect.addListener(() => {
if (remainingMsgs.delete(i)) {
// If the program somehow exits before it responded, note the
// failure and continue (do not stay stuck).
browser.test.fail(`onDisconnect fired before onMessage: ${i}`);
resolve();
} else {
// onDisconnect should not fire when we call port.disconnect()
// below.
browser.test.fail(`Unexpected port.onDisconnect: ${i}`);
}
});
});
port.postMessage(dummyMsg);
}
browser.test.log(`Awaiting replies to: ${Array.from(remainingMsgs)}`);
await Promise.all(firstMessagePromises);
browser.test.log("Now verifying sendNativeMessage behavior");
browser.test.assertEq(
await browser.runtime.sendNativeMessage("echo", "one_off_msg"),
"one_off_msg",
"sendNativeMessage can still roundtrip after so many connectNative"
);
for (const port of openPorts) {
port.disconnect();
}
browser.test.sendMessage("done");
},
});
await extension.startup();
await extension.awaitMessage("done");
info("Waiting for all echo processes to have exit");
await waitForSubprocessExit();
await extension.unload();
});

View File

@@ -87,6 +87,7 @@ Object.assign(win32, {
ERROR_HANDLE_EOF: 38,
ERROR_BROKEN_PIPE: 109,
ERROR_INSUFFICIENT_BUFFER: 122,
ERROR_ABANDONED_WAIT_0: 735,
FILE_ATTRIBUTE_NORMAL: 0x00000080,
FILE_FLAG_OVERLAPPED: 0x40000000,
@@ -108,10 +109,14 @@ Object.assign(win32, {
PROC_THREAD_ATTRIBUTE_HANDLE_LIST: 0x00020002,
JobObjectAssociateCompletionPortInformation: 7,
JobObjectBasicLimitInformation: 2,
JobObjectExtendedLimitInformation: 9,
JOB_OBJECT_LIMIT_BREAKAWAY_OK: 0x00000800,
JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS: 8,
JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: 4,
JOB_OBJECT_MSG_EXIT_PROCESS: 7,
// These constants are 32-bit unsigned integers, but Windows defines
// them as negative integers cast to an unsigned type.
@@ -120,7 +125,6 @@ Object.assign(win32, {
STD_ERROR_HANDLE: -12 + 0x100000000,
WAIT_TIMEOUT: 0x00000102,
WAIT_FAILED: 0xffffffff,
});
Object.assign(win32, {
@@ -150,6 +154,10 @@ Object.assign(win32, {
});
Object.assign(win32, {
JOBOBJECT_ASSOCIATE_COMPLETION_PORT: new ctypes.StructType(
"JOBOBJECT_ASSOCIATE_COMPLETION_PORT",
[{ CompletionKey: win32.PVOID }, { CompletionPort: win32.HANDLE }]
),
JOBOBJECT_EXTENDED_LIMIT_INFORMATION: new ctypes.StructType(
"JOBOBJECT_EXTENDED_LIMIT_INFORMATION",
[
@@ -222,15 +230,6 @@ var libc = new Library("libc", LIBC_CHOICES, {
CloseHandle: [win32.WINAPI, win32.BOOL, win32.HANDLE /* hObject */],
CreateEventW: [
win32.WINAPI,
win32.HANDLE,
win32.SECURITY_ATTRIBUTES.ptr /* opt lpEventAttributes */,
win32.BOOL /* bManualReset */,
win32.BOOL /* bInitialState */,
win32.LPWSTR /* lpName */,
],
CreateFileW: [
win32.WINAPI,
win32.HANDLE,
@@ -243,6 +242,15 @@ var libc = new Library("libc", LIBC_CHOICES, {
win32.HANDLE /* opt hTemplateFile */,
],
CreateIoCompletionPort: [
win32.WINAPI,
win32.HANDLE,
win32.HANDLE /* FileHandle */,
win32.HANDLE /* opt ExistingCompletionPort */,
win32.ULONG_PTR /* CompletionKey */,
win32.DWORD /* NumberOfConcurrentThreads */,
],
CreateJobObjectW: [
win32.WINAPI,
win32.HANDLE,
@@ -287,15 +295,6 @@ var libc = new Library("libc", LIBC_CHOICES, {
win32.PROCESS_INFORMATION.ptr /* out lpProcessInformation */,
],
CreateSemaphoreW: [
win32.WINAPI,
win32.HANDLE,
win32.SECURITY_ATTRIBUTES.ptr /* opt lpSemaphoreAttributes */,
win32.LONG /* lInitialCount */,
win32.LONG /* lMaximumCount */,
win32.LPCWSTR /* opt lpName */,
],
DeleteProcThreadAttributeList: [
win32.WINAPI,
win32.VOID,
@@ -342,6 +341,16 @@ var libc = new Library("libc", LIBC_CHOICES, {
win32.BOOL /* bWait */,
],
GetQueuedCompletionStatus: [
win32.WINAPI,
win32.BOOL,
win32.HANDLE /* CompletionPort */,
win32.LPDWORD /* lpNumberOfBytesTransferred */,
win32.ULONG_PTR.ptr /* out lpCompletionKey */,
win32.OVERLAPPED.ptr.ptr /* out lpOverlapped */,
win32.DWORD /* dwMilliseconds */,
],
GetStdHandle: [win32.WINAPI, win32.HANDLE, win32.DWORD /* nStdHandle */],
InitializeProcThreadAttributeList: [
@@ -353,6 +362,15 @@ var libc = new Library("libc", LIBC_CHOICES, {
win32.PSIZE_T /* in/out lpSize */,
],
PostQueuedCompletionStatus: [
win32.WINAPI,
win32.BOOL,
win32.HANDLE /* CompletionPort */,
win32.DWORD /* dwNumberOfBytesTransferred */,
win32.ULONG_PTR /* dwCompletionKey */,
win32.OVERLAPPED.ptr /* opt lpOverlapped */,
],
ReadFile: [
win32.WINAPI,
win32.BOOL,
@@ -363,14 +381,6 @@ var libc = new Library("libc", LIBC_CHOICES, {
win32.OVERLAPPED.ptr /* opt in/out lpOverlapped */,
],
ReleaseSemaphore: [
win32.WINAPI,
win32.BOOL,
win32.HANDLE /* hSemaphore */,
win32.LONG /* lReleaseCount */,
win32.LONG.ptr /* opt out lpPreviousCount */,
],
ResumeThread: [win32.WINAPI, win32.DWORD, win32.HANDLE /* hThread */],
SetInformationJobObject: [
@@ -408,23 +418,6 @@ var libc = new Library("libc", LIBC_CHOICES, {
win32.PSIZE_T /* opt lpReturnSize */,
],
WaitForMultipleObjects: [
win32.WINAPI,
win32.DWORD,
win32.DWORD /* nCount */,
win32.HANDLE.ptr /* hHandles */,
win32.BOOL /* bWaitAll */,
win32.DWORD /* dwMilliseconds */,
],
WaitForSingleObject: [
win32.WINAPI,
win32.DWORD,
win32.HANDLE /* hHandle */,
win32.BOOL /* bWaitAll */,
win32.DWORD /* dwMilliseconds */,
],
WriteFile: [
win32.WINAPI,
win32.BOOL,
@@ -436,6 +429,10 @@ var libc = new Library("libc", LIBC_CHOICES, {
],
});
// Custom constant to use as CompletionKey / dwCompletionKey.
// See also IOCPKeyGenin subprocess_win.worker.js.
win32.IOCP_COMPLETION_KEY_WAKE_WORKER = 1;
let nextNamedPipeId = 0;
win32.Handle = function (handle) {

View File

@@ -31,20 +31,35 @@ class WinPromiseWorker extends PromiseWorker {
constructor(...args) {
super(...args);
this.signalEvent = libc.CreateSemaphoreW(null, 0, 32, null);
// Used by the worker thread to block until any I/O completes. Used on this
// side to unblock the worker to receive messages from postMessage below.
const iocp = libc.CreateIoCompletionPort(
win32.INVALID_HANDLE_VALUE,
win32.NULL_HANDLE_VALUE,
0,
1 // The worker thread is the only consumer of IOCP.
);
if (!iocp) {
throw new Error(`Failed to create IOCP: ${ctypes.winLastError}`);
}
// Wrap in Handle to ensure that CloseHandle is called after worker exits.
this.iocpHandle = win32.Handle(iocp);
this.call("init", [
{
comspec: Services.env.get("COMSPEC"),
signalEvent: String(
ctypes.cast(this.signalEvent, ctypes.uintptr_t).value
),
iocpCompletionPort: String(ctypes.cast(iocp, ctypes.uintptr_t).value),
},
]);
}
signalWorker() {
libc.ReleaseSemaphore(this.signalEvent, 1, null);
libc.PostQueuedCompletionStatus(
this.iocpHandle,
0,
win32.IOCP_COMPLETION_KEY_WAKE_WORKER,
win32.OVERLAPPED.ptr(0)
);
}
postMessage(...args) {

View File

@@ -23,9 +23,52 @@ const TERMINATE_EXIT_CODE = 0x7f;
let io;
// We use IOCP to monitor for IO completion on Pipe and process termination:
// - CreateIoCompletionPort with Pipe.
// - JOBOBJECT_ASSOCIATE_COMPLETION_PORT with Process.
// - GetQueuedCompletionStatus to query completion (receives CompletionKey).
//
// The CompletionKey can be an arbitrarily chosen value, used to identify the
// target of the IOCP notification. It is defined as ctypes.uintptr_t, so in
// theory it could be 64 or 32 bit. The number of pipes and processes that we
// create can be represented in fewer than 32 bits. We therefore use the higher
// order bits to tag the ID to distinguish Pipes, Processes and custom IOCP
// messages (e.g. IOCP_COMPLETION_KEY_WAKE_WORKER) from each other.
class IOCPKeyGen {
// The IOCP_KEY_IS_PIPE and IOCP_KEY_IS_PROC bits are mutually exclusive.
static IOCP_KEY_IS_PIPE = 1 << 31;
static IOCP_KEY_IS_PROC = 1 << 30;
static keyForPipe(pipe) {
// pipe.id starts at 0 (nextPipeId in this file).
return (IOCPKeyGen.IOCP_KEY_IS_PIPE | pipe.id) >>> 0;
}
static keyForProcess(process) {
// process.id starts at 0 (nextProcessId++ in subprocess_worker_common.js).
return (IOCPKeyGen.IOCP_KEY_IS_PROC | process.id) >>> 0;
}
static isPipeKey(completionKey) {
return !!(IOCPKeyGen.IOCP_KEY_IS_PIPE & completionKey);
}
static isProcessKey(completionKey) {
return !!(IOCPKeyGen.IOCP_KEY_IS_PROC & completionKey);
}
// Only use this if isPipeKey(completionKey) is true:
static pipeIdFromKey(completionKey) {
return (~IOCPKeyGen.IOCP_KEY_IS_PIPE & completionKey) >>> 0;
}
// Only use this if isProcessKey(completionKey) is true:
static processIdFromKey(completionKey) {
return (~IOCPKeyGen.IOCP_KEY_IS_PROC & completionKey) >>> 0;
}
}
let nextPipeId = 0;
class Pipe extends BasePipe {
// origHandle MUST be opened with the FILE_FLAG_OVERLAPPED flag.
constructor(process, origHandle) {
super();
@@ -49,21 +92,24 @@ class Pipe extends BasePipe {
this.handle = win32.Handle(handle);
let event = libc.CreateEventW(null, false, false, null);
this.overlapped = win32.OVERLAPPED();
this.overlapped.hEvent = event;
this._event = win32.Handle(event);
let ok = libc.CreateIoCompletionPort(
handle,
io.iocpCompletionPort,
IOCPKeyGen.keyForPipe(this),
0 // Ignored.
);
if (!ok) {
// Truly unexpected. We won't be able to observe IO on this Pipe.
debug(`Failed to associate IOCP: ${ctypes.winLastError}`);
}
this.buffer = null;
}
get event() {
if (this.pending.length) {
return this._event;
}
return null;
hasPendingIO() {
return !!this.pending.length;
}
maybeClose() {}
@@ -96,7 +142,6 @@ class Pipe extends BasePipe {
if (!this.closed) {
this.handle.dispose();
this._event.dispose();
io.pipes.delete(this.id);
@@ -327,25 +372,6 @@ class OutputPipe extends Pipe {
}
}
class Signal {
constructor(event) {
this.event = event;
}
cleanup() {
libc.CloseHandle(this.event);
this.event = null;
}
onError() {
io.shutdown();
}
onReady() {
io.messageCount += 1;
}
}
class Process extends BaseProcess {
constructor(...args) {
super(...args);
@@ -353,14 +379,6 @@ class Process extends BaseProcess {
this.killed = false;
}
/**
* Returns our process handle for use as an event in a WaitForMultipleObjects
* call.
*/
get event() {
return this.handle;
}
/**
* Forcibly terminates the process.
*/
@@ -572,9 +590,29 @@ class Process extends BaseProcess {
ctypes.cast(info.address(), ctypes.voidptr_t),
info.constructor.size
);
errorMessage = `Failed to set job limits: 0x${(
ctypes.winLastError || 0
).toString(16)}`;
if (!ok) {
errorMessage = `Failed to set job limits: 0x${(
ctypes.winLastError || 0
).toString(16)}`;
}
}
if (ok) {
let acp = win32.JOBOBJECT_ASSOCIATE_COMPLETION_PORT();
acp.CompletionKey = win32.PVOID(IOCPKeyGen.keyForProcess(this));
acp.CompletionPort = io.iocpCompletionPort;
ok = libc.SetInformationJobObject(
this.jobHandle,
win32.JobObjectAssociateCompletionPortInformation,
ctypes.cast(acp.address(), ctypes.voidptr_t),
acp.constructor.size
);
if (!ok) {
errorMessage = `Failed to set IOCP: 0x${(
ctypes.winLastError || 0
).toString(16)}`;
}
}
if (ok) {
@@ -644,6 +682,9 @@ class Process extends BaseProcess {
this.handle.dispose();
this.handle = null;
// This also terminates all child processes under this process, unless
// the child process was created with the CREATE_BREAKAWAY_FROM_JOB flag,
// in which case it would not be part of our job (and therefore survive).
libc.TerminateJobObject(this.jobHandle, TERMINATE_EXIT_CODE);
this.jobHandle.dispose();
this.jobHandle = null;
@@ -660,8 +701,7 @@ class Process extends BaseProcess {
}
io = {
events: null,
eventHandlers: null,
iocpCompletionPort: null,
pipes: new Map(),
@@ -676,11 +716,12 @@ io = {
init(details) {
this.comspec = details.comspec;
let signalEvent = ctypes.cast(
ctypes.uintptr_t(details.signalEvent),
// Note: not wrapped in win32.Handle - parent thread is responsible for
// releasing the resource when this thread terminates.
this.iocpCompletionPort = ctypes.cast(
ctypes.uintptr_t(details.iocpCompletionPort),
win32.HANDLE
);
this.signal = new Signal(signalEvent);
this.updatePollEvents();
setTimeout(this.loop.bind(this), 0);
@@ -690,9 +731,6 @@ io = {
if (this.running) {
this.running = false;
this.signal.cleanup();
this.signal = null;
self.postMessage({ msg: "close" });
self.close();
}
@@ -719,28 +757,29 @@ io = {
},
updatePollEvents() {
let handlers = [
this.signal,
...this.pipes.values(),
...this.processes.values(),
];
handlers = handlers.filter(handler => handler.event);
let shouldPoll = false;
if (this.processes.size) {
// As long as the process is alive, it may notify IOCP.
// When the process exits, we'll remove it from io.processes.
shouldPoll = true;
} else {
for (let pipe of this.pipes.values()) {
if (pipe.hasPendingIO()) {
shouldPoll = true;
break;
}
}
}
// Our poll loop is only useful if we've got at least 1 thing to poll other than our own
// signal.
if (handlers.length == 1) {
if (!shouldPoll) {
this.polling = false;
} else if (!this.polling && this.running) {
// Restart the poll loop if necessary:
setTimeout(this.loop.bind(this), 0);
this.polling = true;
}
this.eventHandlers = handlers;
let handles = handlers.map(handler => handler.event);
this.events = win32.HANDLE.array()(handles);
},
loop() {
@@ -751,28 +790,108 @@ io = {
},
poll() {
// On the first call, wait until any IO signaled. After that, immediately
// process all other IO that have signaled in the meantime.
let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
for (; ; timeout = 0) {
let events = this.events;
let handlers = this.eventHandlers;
let result = libc.WaitForMultipleObjects(
events.length,
events,
false,
let numberOfBytesTransferred = win32.DWORD();
let completionKeyOut = win32.ULONG_PTR();
let lpOverlapped = win32.OVERLAPPED.ptr(0);
let ok = libc.GetQueuedCompletionStatus(
io.iocpCompletionPort,
numberOfBytesTransferred.address(),
completionKeyOut.address(),
lpOverlapped.address(),
timeout
);
if (result < handlers.length) {
const deqWinErr = ok ? 0 : ctypes.winLastError;
if (!ok) {
if (deqWinErr === win32.WAIT_TIMEOUT) {
// No changes, return (caller may schedule another loop/poll).
break;
}
if (deqWinErr === win32.ERROR_ABANDONED_WAIT_0) {
// iocpCompletionPort was closed.
io.shutdown();
break;
}
if (lpOverlapped.isNull()) {
// "If *lpOverlapped is NULL, the function did not dequeue a
// completion packet from the completion port."
// No remaining data, return (caller may schedule another loop/poll).
break;
}
// Received completion packet that failed, fall through.
}
let completionKey = parseInt(completionKeyOut.value, 10);
if (completionKey === win32.IOCP_COMPLETION_KEY_WAKE_WORKER) {
// Custom notification from the parent thread.
io.messageCount += 1;
// Continue to process any completed IO, then return (and eventually
// yield to the event loop to allow onmessage to receive messages).
continue;
}
if (IOCPKeyGen.isPipeKey(completionKey)) {
const pipeId = IOCPKeyGen.pipeIdFromKey(completionKey);
const pipe = io.pipes.get(pipeId);
if (!pipe) {
debug(`IOCP notification for unknown pipe: ${pipeId}`);
continue;
}
if (deqWinErr === win32.ERROR_BROKEN_PIPE) {
pipe.onError();
continue;
}
try {
handlers[result].onReady();
pipe.onReady();
} catch (e) {
console.error(e);
debug(`Worker error: ${e} :: ${e.stack}`);
handlers[result].onError();
pipe.onError();
}
} else if (IOCPKeyGen.isProcessKey(completionKey)) {
// This is a notification via JOBOBJECT_ASSOCIATE_COMPLETION_PORT.
// "numberOfBytesTransferred" is any of the JOB_OBJECT_MSG_* values.
// https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_associate_completion_port
const jobMsgId = numberOfBytesTransferred.value;
const processId = IOCPKeyGen.processIdFromKey(completionKey);
// The job reaching process count zero is a very strong indication that
// the process has exit. We also listen to the other *EXIT_PROCESS
// messages in case the process spawned child processes under the job,
// because that would suppress the JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO
// notification and prevent us from detecting process termination.
const isExit =
jobMsgId === win32.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS ||
jobMsgId === win32.JOB_OBJECT_MSG_EXIT_PROCESS;
const isJobZero = jobMsgId === win32.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO;
if (!isExit && !isJobZero) {
// Ignore non-exit notifications, such as additional new processes.
continue;
}
const process = io.processes.get(processId);
if (!process) {
// This may happen when isJobZero == true, because we could have
// observed isExit == true before.
continue;
}
if (isExit) {
let realPid = ctypes.cast(lpOverlapped, win32.DWORD).value;
if (process.pid !== realPid) {
// A random child process (spawned by the process) has exited.
continue;
}
}
try {
process.onReady();
} catch (e) {
// This is really unexpected, but don't break the poll loop.
console.error(e);
debug(`Worker error: ${e} :: ${e.stack}`);
}
} else {
break;
debug(`Unexpected IOCP CompletionKey: ${completionKey}`);
}
}
},

View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
# This file is run by test_subprocess_child.js. To test this file in isolation:
# python3 -u toolkit/modules/subprocess/test/xpcshell/data_test_child.py spawn_child_and_exit
# Then separately, to the displayed URL "Listening at http://127.0.0.1:12345",
# with 12345 being a random port,
# curl http://127.0.0.1:12345 -X DELETE # request exit of parent
# curl http://127.0.0.1:12345 # request exit of child
import sys
import time
def sleep_for_a_little_bit():
time.sleep(0.2)
def spawn_child_and_exit(is_breakaway_job):
"""
Spawns and exits child processes to allow tests to verify that they detect
specifically the exit of this (parent) process.
The expected sequence of outputs is as follows:
1. parent_start
2. first_child_start_and_exit
3. parent_after_first_child_exit
4. spawned_child_start
5. Listening at http://127.0.0.1:12345 - with 12345 being random port
6. child_received_http_request - DELETE request from test.
7. data_from_child:kill_parent
8. parent_exit
( now the parent has exit)
( child_process_still_alive_1 response sent to request from step 6)
( wait for new request from client to request child to exit )
( child_process_still_alive_2 response sent to that new request )
9. spawned_child_exit
"""
import subprocess
print("1. parent_start", flush=True)
# Start and exit a child process (used to make sure that we do not
# mistakenly detect an exited child process for the parent process).
subprocess.run(
[sys.executable, "-c", "print('2. first_child_start_and_exit')"],
stdout=sys.stdout,
stderr=sys.stderr,
)
# Wait a bit to make sure that the child's exit signal has been processed.
# This is not strictly needed, because we don't expect the child to affect
# the parent, but in case of a flawed implementation, this would enable the
# test to detect a bad implementation (by observing the exit before the
# "parent_after_first_child_exit" message below).
sleep_for_a_little_bit()
print("3. parent_after_first_child_exit", flush=True)
creationflags = 0
if is_breakaway_job:
# See comment in test_subprocess_child.js; in short we need this flag
# to make sure that the child outlives the parent when the subprocess
# implementation calls TerminateJobObject.
creationflags = subprocess.CREATE_BREAKAWAY_FROM_JOB
child_proc = subprocess.Popen(
[sys.executable, "-u", __file__, "spawned_child"],
creationflags=creationflags,
# We don't need this pipe, but when this side of the process exits,
# the pipe is closed, which the child can use to detect that the parent
# has exit.
stdin=subprocess.PIPE,
# We are using stdout as a control channel to allow the child to
# notify us when the process is done.
stdout=subprocess.PIPE,
# stderr is redirected to the real stdout, so that the caller can
# still observe print() from spawned_child.
stderr=sys.stdout,
)
# This blocks until the child has notified us.
data_from_child = child_proc.stdout.readline().decode().rstrip()
print(f"7. data_from_child:{data_from_child}", flush=True)
print("8. parent_exit", flush=True)
# Wait a little bit to make sure that stdout has been flushed (and read by
# the caller) when the process exits.
sleep_for_a_little_bit()
sys.exit(0)
def spawned_child():
import http.server
import socketserver
def print_to_parent_stdout(msg):
# The parent maps our stderr to its stdout.
print(msg, flush=True, file=sys.stderr)
# This is spawned via spawn_child_and_exit.
print_to_parent_stdout("4. spawned_child_start")
class RequestHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, *args):
pass # Disable logging
def do_DELETE(self):
print_to_parent_stdout("6. child_received_http_request")
# Let the caller know that we are responsive.
self.send_response(200)
self.send_header("Connection", "close")
self.end_headers()
# Wait a little bit to allow the network request to be
# processed by the client. If for some reason the termination
# of the parent also kills the child, then at least the client
# has had a chance to become aware of it.
sleep_for_a_little_bit()
# Now ask the parent to exit, and continue here.
print("kill_parent", flush=True)
# When the parent exits, stdin closes, which we detect here:
res = sys.stdin.read(1)
if len(res):
print_to_parent_stdout("spawned_child_UNEXPECTED_STDIN")
# If we make it here, it means that this child outlived the
# parent, and we can let the client know.
# (if the child process is terminated prematurely, the client
# would also know through a disconnected socket).
self.wfile.write(b"child_process_still_alive_1")
def do_GET(self):
self.send_response(200)
self.send_header("Connection", "close")
self.end_headers()
self.wfile.write(b"child_process_still_alive_2")
# Starts a server that handles two requests and then closes the server.
with socketserver.TCPServer(("127.0.0.1", 0), RequestHandler) as server:
host, port = server.server_address[:2]
print_to_parent_stdout(f"5. Listening at http://{host}:{port}")
# Expecting DELETE request (do_DELETE)
server.handle_request()
# Expecting GET request (do_GET)
server.handle_request()
print_to_parent_stdout("9. spawned_child_exit")
sys.exit(0)
cmd = sys.argv[1]
if cmd == "spawn_child_and_exit":
spawn_child_and_exit(is_breakaway_job=False)
elif cmd == "spawn_child_in_breakaway_job_and_exit":
spawn_child_and_exit(is_breakaway_job=True)
elif cmd == "spawned_child":
spawned_child()
else:
raise Exception(f"Unknown command: {cmd}")

View File

@@ -0,0 +1,203 @@
"use strict";
const { TestUtils } = ChromeUtils.importESModule(
"resource://testing-common/TestUtils.sys.mjs"
);
const { setTimeout } = ChromeUtils.importESModule(
"resource://gre/modules/Timer.sys.mjs"
);
let PYTHON;
add_setup(async () => {
PYTHON = await Subprocess.pathSearch(Services.env.get("PYTHON"));
});
// Send a request to the test server. We expect a non-empty response. Any error
// is caught and returned as an empty string.
async function fetchResponseText(url, method) {
const timeoutCtrl = new AbortController();
// We expect the (localhost) server to respond soon. If the server is not
// listening, we will eventually time out. Let's have a generous deadlines:
// - DELETE is given plenty of time, because the request is expected to
// always be accepted, followed by IPC and process termination.
// - GET is given a short deadline, because we expect either an immediate
// response (server is around) or no response (server was closed).
const timeout = method === "DELETE" ? 10000 : 1000;
// eslint-disable-next-line mozilla/no-arbitrary-setTimeout
setTimeout(() => timeoutCtrl.abort(), timeout);
// (^ AbortSignal.timeout() would be nicer but cannot be used, because it
// throws: "The current global does not support timeout".)
try {
let res = await fetch(url, { method, signal: timeoutCtrl.signal });
return await res.text();
} catch (e) {
info(`fetch() request to kill parent failed: ${e}`);
if (
e.name !== "AbortError" &&
e.message !== "NetworkError when attempting to fetch resource."
) {
ok(false, `Unexpected error: ${e}`);
}
return "";
}
}
async function do_test_spawn_parent_with_child(isBreakAwayJob) {
let testCmd;
if (isBreakAwayJob) {
Assert.equal(AppConstants.platform, "win", "Breakaway is Windows-only");
testCmd = "spawn_child_in_breakaway_job_and_exit";
} else {
testCmd = "spawn_child_and_exit";
}
const TEST_SCRIPT = do_get_file("data_test_child.py").path;
info(`Launching proc: ${PYTHON} -u ${TEST_SCRIPT} ${testCmd}`);
const proc = await Subprocess.call({
command: PYTHON,
arguments: ["-u", TEST_SCRIPT, testCmd],
});
const exitPromise = proc.wait();
let exited = false;
exitPromise.then(() => {
exited = true;
});
info(`Spawned process with pid ${proc.pid}, waiting for child`);
// We'll accumulate stdout, and reset what we've received so far after
// checking that the content matches with our expectations.
let stdoutText = "";
let stdoutDonePromise = (async () => {
let seenParentExit = false;
for (let s; (s = await proc.stdout.readString()); ) {
// On Windows, print() uses \r\n instead of \n. Drop it.
s = s.replaceAll("\r", "");
stdoutText += s;
dump(`Received stdout from test script: ${s}\n`);
seenParentExit ||= stdoutText.includes("parent_exit");
if (!seenParentExit) {
Assert.ok(!exited, "Process should not have exited yet");
}
}
})();
const EXPECTED_STDOUT_UNTIL_LISTENING = `\
1. parent_start
2. first_child_start_and_exit
3. parent_after_first_child_exit
4. spawned_child_start
5. Listening at `; // followed by http://127.0.0.1:<digits>\n
const EXPECTED_STDOUT_AT_PARENT_EXIT = `\
6. child_received_http_request
7. data_from_child:kill_parent
8. parent_exit
`;
const EXPECTED_STDOUT_AT_CHILD_EXIT = `\
9. spawned_child_exit
`;
await TestUtils.waitForCondition(
() => stdoutText.startsWith(EXPECTED_STDOUT_UNTIL_LISTENING),
"Waiting for (parent) process to start child with listening HTTP server"
);
const url = stdoutText.replace(EXPECTED_STDOUT_UNTIL_LISTENING, "").trim();
ok(/^http:\/\/127\.0\.0\.1:\d+$/.test(url), `Found server URL: ${url}`);
stdoutText = ""; // Reset now that we have confirmed its content.
// When the server receives this request, it triggers exit of the process.
const promiseResponseToExitParent = fetchResponseText(url, "DELETE");
info("Waiting for spawned (parent) process to exit");
const { exitCode } = await exitPromise;
equal(exitCode, 0, "Got expected exit code");
equal(stdoutText, EXPECTED_STDOUT_AT_PARENT_EXIT, "stdout before exit");
stdoutText = ""; // Reset again after checking its content.
// Now two things can happen:
// - Either the child lives on, and we can send a request to it to ask the
// child to exit as well.
// - Or the process tree rooted at the parent dies, and the child is taken
// with it. The initial fetch() might succeed or reject. A new fetch() is
// most likely going to reject.
// On Linux and macOS, the child appears to live on.
// On Windows, the whole process tree dies, when run under this test
// (but somehow not when the Python script is run in isolation...?).
const responseToExitParent = await promiseResponseToExitParent;
if (!responseToExitParent) {
info("fetch() request to kill parent failed");
}
info("Checking whether the child process is still alive...");
const responseToChildAlive = await fetchResponseText(url, "GET");
const wasChildAlive = !!responseToChildAlive;
if (wasChildAlive) {
// Mainly a sanity check: If the second request gots through, then the
// first request should have received the expected response.
equal(responseToExitParent, "child_process_still_alive_1", "Still alive 1");
equal(responseToChildAlive, "child_process_still_alive_2", "Still alive 2");
} else {
info("fetch() request to check child liveness failed");
}
if (AppConstants.platform === "win" && !isBreakAwayJob) {
ok(!wasChildAlive, "Child process exits when the parent exits");
} else {
ok(wasChildAlive, "Child process outlives parent");
}
await stdoutDonePromise;
// On Windows, we close the pipes as soon as the parent process was detected
// to have exited. This prevents the child's write to be read.
if (wasChildAlive && AppConstants.platform !== "win") {
equal(stdoutText, EXPECTED_STDOUT_AT_CHILD_EXIT, "Stdout from child");
} else {
equal(stdoutText, "", "No more stdout after parent exited (with child)");
}
}
// Tests spawning a process that exits after spawning a child process, and
// verify that the parent process is detected as exited, while the child
// process is still running.
add_task(async function test_spawn_child_outliving_parent_process() {
// Our subprocess implementation (subprocess_win.worker.js) puts the spawned
// process in a job, and cleans up that job upon process exit by calling
// TerminateJobObject. This causes all other (child) processes that are part
// of the job to terminate. To make sure that the child process outlives the
// parent, we have to launch it with the CREATE_BREAKAWAY_FROM_JOB flag.
const isBreakAwayJob = AppConstants.platform == "win";
await do_test_spawn_parent_with_child(isBreakAwayJob);
});
// On Windows, child processes are terminated along with the parent by default,
// because our subprocess implementation puts the process in a Job, and the
// TerminateJobObject call terminates all associated child processes. This
// test confirms the default behavior, as opposed to what we see in
// test_spawn_child_outliving_parent_process.
add_task(
{ skip_if: () => AppConstants.platform != "win" },
async function test_child_terminated_on_process_termination() {
await do_test_spawn_parent_with_child(false);
}
);
add_task(async function test_cleanup() {
let { getSubprocessImplForTest } = ChromeUtils.importESModule(
"resource://gre/modules/Subprocess.sys.mjs"
);
let worker = getSubprocessImplForTest().Process.getWorker();
let openFiles = await worker.call("getOpenFiles", []);
let processes = await worker.call("getProcesses", []);
equal(openFiles.size, 0, "No remaining open files");
equal(processes.size, 0, "No remaining processes");
});

View File

@@ -5,6 +5,7 @@ skip-if = ["os == 'android'"]
subprocess = true
support-files = [
"data_text_file.txt",
"data_test_child.py",
"data_test_script.py",
]
@@ -15,6 +16,8 @@ skip-if = [
]
run-sequentially = "very high failure rate in parallel"
["test_subprocess_child.js"]
["test_subprocess_connectRunning.js"]
skip-if = [
"os == 'mac' && os_version == '11.20' && arch == 'aarch64'", # Bug 1936343