diff --git a/toolkit/components/extensions/test/xpcshell/native_messaging.toml b/toolkit/components/extensions/test/xpcshell/native_messaging.toml index ecd1ebb331d0..0e883ed40bd7 100644 --- a/toolkit/components/extensions/test/xpcshell/native_messaging.toml +++ b/toolkit/components/extensions/test/xpcshell/native_messaging.toml @@ -12,6 +12,8 @@ tags = "webextensions" ["test_ext_native_messaging.js"] run-sequentially = "very high failure rate in parallel" +["test_ext_native_messaging_concurrent.js"] + ["test_ext_native_messaging_perf.js"] skip-if = ["tsan"] # Unreasonably slow, bug 1612707 diff --git a/toolkit/components/extensions/test/xpcshell/test_ext_native_messaging_concurrent.js b/toolkit/components/extensions/test/xpcshell/test_ext_native_messaging_concurrent.js new file mode 100644 index 000000000000..0d992552cdd1 --- /dev/null +++ b/toolkit/components/extensions/test/xpcshell/test_ext_native_messaging_concurrent.js @@ -0,0 +1,107 @@ +/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */ +/* vim: set sts=2 sw=2 et tw=80: */ +"use strict"; + +AddonTestUtils.init(this); + +const ECHO_BODY = String.raw` + import struct + import sys + + stdin = getattr(sys.stdin, 'buffer', sys.stdin) + stdout = getattr(sys.stdout, 'buffer', sys.stdout) + + while True: + rawlen = stdin.read(4) + if len(rawlen) == 0: + sys.exit(0) + msglen = struct.unpack('@I', rawlen)[0] + msg = stdin.read(msglen) + + stdout.write(struct.pack('@I', msglen)) + stdout.write(msg) +`; + +const SCRIPTS = [ + { + name: "echo", + description: "a native app that echoes back messages it receives", + script: ECHO_BODY.replace(/^ {2}/gm, ""), + }, +]; + +function loadTestExtension({ background }) { + return ExtensionTestUtils.loadExtension({ + background, + manifest: { + browser_specific_settings: { gecko: { id: ID } }, + permissions: ["nativeMessaging"], + }, + }); +} + +add_setup(async () => { + await ExtensionTestUtils.startAddonManager(); + await setupHosts(SCRIPTS); +}); + +// Regression test for https://bugzilla.mozilla.org/show_bug.cgi?id=1979546 +add_task(async function test_many_connectNative_calls() { + let extension = loadTestExtension({ + async background() { + // Prior to bug 1979546 being fixed, the test got stuck on Windows at 16. + // Let's verify that things run smoothly even if we launch "many" more, + // e.g. 70 (arbitrarily chosen, higher than 64). + const NUMBER_OF_CONCURRENT_NATIVE_MESSAGING_APPS = 70; + + const openPorts = []; + const remainingMsgs = new Set(); + const firstMessagePromises = []; + for (let i = 0; i < NUMBER_OF_CONCURRENT_NATIVE_MESSAGING_APPS; ++i) { + const dummyMsg = `pingpong-${i}`; + const port = browser.runtime.connectNative("echo"); + openPorts[i] = port; + remainingMsgs.add(i); + firstMessagePromises[i] = new Promise(resolve => { + port.onMessage.addListener(msg => { + browser.test.assertEq(dummyMsg, msg, `Echoed back message ${i}`); + remainingMsgs.delete(i); + browser.test.log(`Remaining: ${Array.from(remainingMsgs)}`); + resolve(); + }); + port.onDisconnect.addListener(() => { + if (remainingMsgs.delete(i)) { + // If the program somehow exits before it responded, note the + // failure and continue (do not stay stuck). + browser.test.fail(`onDisconnect fired before onMessage: ${i}`); + resolve(); + } else { + // onDisconnect should not fire when we call port.disconnect() + // below. + browser.test.fail(`Unexpected port.onDisconnect: ${i}`); + } + }); + }); + port.postMessage(dummyMsg); + } + browser.test.log(`Awaiting replies to: ${Array.from(remainingMsgs)}`); + await Promise.all(firstMessagePromises); + + browser.test.log("Now verifying sendNativeMessage behavior"); + browser.test.assertEq( + await browser.runtime.sendNativeMessage("echo", "one_off_msg"), + "one_off_msg", + "sendNativeMessage can still roundtrip after so many connectNative" + ); + for (const port of openPorts) { + port.disconnect(); + } + browser.test.sendMessage("done"); + }, + }); + await extension.startup(); + await extension.awaitMessage("done"); + info("Waiting for all echo processes to have exit"); + await waitForSubprocessExit(); + await extension.unload(); +}); diff --git a/toolkit/modules/subprocess/subprocess_shared_win.js b/toolkit/modules/subprocess/subprocess_shared_win.js index 964e64712817..9da16ec48be1 100644 --- a/toolkit/modules/subprocess/subprocess_shared_win.js +++ b/toolkit/modules/subprocess/subprocess_shared_win.js @@ -87,6 +87,7 @@ Object.assign(win32, { ERROR_HANDLE_EOF: 38, ERROR_BROKEN_PIPE: 109, ERROR_INSUFFICIENT_BUFFER: 122, + ERROR_ABANDONED_WAIT_0: 735, FILE_ATTRIBUTE_NORMAL: 0x00000080, FILE_FLAG_OVERLAPPED: 0x40000000, @@ -108,10 +109,14 @@ Object.assign(win32, { PROC_THREAD_ATTRIBUTE_HANDLE_LIST: 0x00020002, + JobObjectAssociateCompletionPortInformation: 7, JobObjectBasicLimitInformation: 2, JobObjectExtendedLimitInformation: 9, JOB_OBJECT_LIMIT_BREAKAWAY_OK: 0x00000800, + JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS: 8, + JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: 4, + JOB_OBJECT_MSG_EXIT_PROCESS: 7, // These constants are 32-bit unsigned integers, but Windows defines // them as negative integers cast to an unsigned type. @@ -120,7 +125,6 @@ Object.assign(win32, { STD_ERROR_HANDLE: -12 + 0x100000000, WAIT_TIMEOUT: 0x00000102, - WAIT_FAILED: 0xffffffff, }); Object.assign(win32, { @@ -150,6 +154,10 @@ Object.assign(win32, { }); Object.assign(win32, { + JOBOBJECT_ASSOCIATE_COMPLETION_PORT: new ctypes.StructType( + "JOBOBJECT_ASSOCIATE_COMPLETION_PORT", + [{ CompletionKey: win32.PVOID }, { CompletionPort: win32.HANDLE }] + ), JOBOBJECT_EXTENDED_LIMIT_INFORMATION: new ctypes.StructType( "JOBOBJECT_EXTENDED_LIMIT_INFORMATION", [ @@ -222,15 +230,6 @@ var libc = new Library("libc", LIBC_CHOICES, { CloseHandle: [win32.WINAPI, win32.BOOL, win32.HANDLE /* hObject */], - CreateEventW: [ - win32.WINAPI, - win32.HANDLE, - win32.SECURITY_ATTRIBUTES.ptr /* opt lpEventAttributes */, - win32.BOOL /* bManualReset */, - win32.BOOL /* bInitialState */, - win32.LPWSTR /* lpName */, - ], - CreateFileW: [ win32.WINAPI, win32.HANDLE, @@ -243,6 +242,15 @@ var libc = new Library("libc", LIBC_CHOICES, { win32.HANDLE /* opt hTemplateFile */, ], + CreateIoCompletionPort: [ + win32.WINAPI, + win32.HANDLE, + win32.HANDLE /* FileHandle */, + win32.HANDLE /* opt ExistingCompletionPort */, + win32.ULONG_PTR /* CompletionKey */, + win32.DWORD /* NumberOfConcurrentThreads */, + ], + CreateJobObjectW: [ win32.WINAPI, win32.HANDLE, @@ -287,15 +295,6 @@ var libc = new Library("libc", LIBC_CHOICES, { win32.PROCESS_INFORMATION.ptr /* out lpProcessInformation */, ], - CreateSemaphoreW: [ - win32.WINAPI, - win32.HANDLE, - win32.SECURITY_ATTRIBUTES.ptr /* opt lpSemaphoreAttributes */, - win32.LONG /* lInitialCount */, - win32.LONG /* lMaximumCount */, - win32.LPCWSTR /* opt lpName */, - ], - DeleteProcThreadAttributeList: [ win32.WINAPI, win32.VOID, @@ -342,6 +341,16 @@ var libc = new Library("libc", LIBC_CHOICES, { win32.BOOL /* bWait */, ], + GetQueuedCompletionStatus: [ + win32.WINAPI, + win32.BOOL, + win32.HANDLE /* CompletionPort */, + win32.LPDWORD /* lpNumberOfBytesTransferred */, + win32.ULONG_PTR.ptr /* out lpCompletionKey */, + win32.OVERLAPPED.ptr.ptr /* out lpOverlapped */, + win32.DWORD /* dwMilliseconds */, + ], + GetStdHandle: [win32.WINAPI, win32.HANDLE, win32.DWORD /* nStdHandle */], InitializeProcThreadAttributeList: [ @@ -353,6 +362,15 @@ var libc = new Library("libc", LIBC_CHOICES, { win32.PSIZE_T /* in/out lpSize */, ], + PostQueuedCompletionStatus: [ + win32.WINAPI, + win32.BOOL, + win32.HANDLE /* CompletionPort */, + win32.DWORD /* dwNumberOfBytesTransferred */, + win32.ULONG_PTR /* dwCompletionKey */, + win32.OVERLAPPED.ptr /* opt lpOverlapped */, + ], + ReadFile: [ win32.WINAPI, win32.BOOL, @@ -363,14 +381,6 @@ var libc = new Library("libc", LIBC_CHOICES, { win32.OVERLAPPED.ptr /* opt in/out lpOverlapped */, ], - ReleaseSemaphore: [ - win32.WINAPI, - win32.BOOL, - win32.HANDLE /* hSemaphore */, - win32.LONG /* lReleaseCount */, - win32.LONG.ptr /* opt out lpPreviousCount */, - ], - ResumeThread: [win32.WINAPI, win32.DWORD, win32.HANDLE /* hThread */], SetInformationJobObject: [ @@ -408,23 +418,6 @@ var libc = new Library("libc", LIBC_CHOICES, { win32.PSIZE_T /* opt lpReturnSize */, ], - WaitForMultipleObjects: [ - win32.WINAPI, - win32.DWORD, - win32.DWORD /* nCount */, - win32.HANDLE.ptr /* hHandles */, - win32.BOOL /* bWaitAll */, - win32.DWORD /* dwMilliseconds */, - ], - - WaitForSingleObject: [ - win32.WINAPI, - win32.DWORD, - win32.HANDLE /* hHandle */, - win32.BOOL /* bWaitAll */, - win32.DWORD /* dwMilliseconds */, - ], - WriteFile: [ win32.WINAPI, win32.BOOL, @@ -436,6 +429,10 @@ var libc = new Library("libc", LIBC_CHOICES, { ], }); +// Custom constant to use as CompletionKey / dwCompletionKey. +// See also IOCPKeyGenin subprocess_win.worker.js. +win32.IOCP_COMPLETION_KEY_WAKE_WORKER = 1; + let nextNamedPipeId = 0; win32.Handle = function (handle) { diff --git a/toolkit/modules/subprocess/subprocess_win.sys.mjs b/toolkit/modules/subprocess/subprocess_win.sys.mjs index ac8443099cd3..6ca18e9d0f6a 100644 --- a/toolkit/modules/subprocess/subprocess_win.sys.mjs +++ b/toolkit/modules/subprocess/subprocess_win.sys.mjs @@ -31,20 +31,35 @@ class WinPromiseWorker extends PromiseWorker { constructor(...args) { super(...args); - this.signalEvent = libc.CreateSemaphoreW(null, 0, 32, null); + // Used by the worker thread to block until any I/O completes. Used on this + // side to unblock the worker to receive messages from postMessage below. + const iocp = libc.CreateIoCompletionPort( + win32.INVALID_HANDLE_VALUE, + win32.NULL_HANDLE_VALUE, + 0, + 1 // The worker thread is the only consumer of IOCP. + ); + if (!iocp) { + throw new Error(`Failed to create IOCP: ${ctypes.winLastError}`); + } + // Wrap in Handle to ensure that CloseHandle is called after worker exits. + this.iocpHandle = win32.Handle(iocp); this.call("init", [ { comspec: Services.env.get("COMSPEC"), - signalEvent: String( - ctypes.cast(this.signalEvent, ctypes.uintptr_t).value - ), + iocpCompletionPort: String(ctypes.cast(iocp, ctypes.uintptr_t).value), }, ]); } signalWorker() { - libc.ReleaseSemaphore(this.signalEvent, 1, null); + libc.PostQueuedCompletionStatus( + this.iocpHandle, + 0, + win32.IOCP_COMPLETION_KEY_WAKE_WORKER, + win32.OVERLAPPED.ptr(0) + ); } postMessage(...args) { diff --git a/toolkit/modules/subprocess/subprocess_win.worker.js b/toolkit/modules/subprocess/subprocess_win.worker.js index b4e431b7722d..8363986bc259 100644 --- a/toolkit/modules/subprocess/subprocess_win.worker.js +++ b/toolkit/modules/subprocess/subprocess_win.worker.js @@ -23,9 +23,52 @@ const TERMINATE_EXIT_CODE = 0x7f; let io; +// We use IOCP to monitor for IO completion on Pipe and process termination: +// - CreateIoCompletionPort with Pipe. +// - JOBOBJECT_ASSOCIATE_COMPLETION_PORT with Process. +// - GetQueuedCompletionStatus to query completion (receives CompletionKey). +// +// The CompletionKey can be an arbitrarily chosen value, used to identify the +// target of the IOCP notification. It is defined as ctypes.uintptr_t, so in +// theory it could be 64 or 32 bit. The number of pipes and processes that we +// create can be represented in fewer than 32 bits. We therefore use the higher +// order bits to tag the ID to distinguish Pipes, Processes and custom IOCP +// messages (e.g. IOCP_COMPLETION_KEY_WAKE_WORKER) from each other. +class IOCPKeyGen { + // The IOCP_KEY_IS_PIPE and IOCP_KEY_IS_PROC bits are mutually exclusive. + static IOCP_KEY_IS_PIPE = 1 << 31; + static IOCP_KEY_IS_PROC = 1 << 30; + + static keyForPipe(pipe) { + // pipe.id starts at 0 (nextPipeId in this file). + return (IOCPKeyGen.IOCP_KEY_IS_PIPE | pipe.id) >>> 0; + } + static keyForProcess(process) { + // process.id starts at 0 (nextProcessId++ in subprocess_worker_common.js). + return (IOCPKeyGen.IOCP_KEY_IS_PROC | process.id) >>> 0; + } + + static isPipeKey(completionKey) { + return !!(IOCPKeyGen.IOCP_KEY_IS_PIPE & completionKey); + } + static isProcessKey(completionKey) { + return !!(IOCPKeyGen.IOCP_KEY_IS_PROC & completionKey); + } + + // Only use this if isPipeKey(completionKey) is true: + static pipeIdFromKey(completionKey) { + return (~IOCPKeyGen.IOCP_KEY_IS_PIPE & completionKey) >>> 0; + } + // Only use this if isProcessKey(completionKey) is true: + static processIdFromKey(completionKey) { + return (~IOCPKeyGen.IOCP_KEY_IS_PROC & completionKey) >>> 0; + } +} + let nextPipeId = 0; class Pipe extends BasePipe { + // origHandle MUST be opened with the FILE_FLAG_OVERLAPPED flag. constructor(process, origHandle) { super(); @@ -49,21 +92,24 @@ class Pipe extends BasePipe { this.handle = win32.Handle(handle); - let event = libc.CreateEventW(null, false, false, null); - this.overlapped = win32.OVERLAPPED(); - this.overlapped.hEvent = event; - this._event = win32.Handle(event); + let ok = libc.CreateIoCompletionPort( + handle, + io.iocpCompletionPort, + IOCPKeyGen.keyForPipe(this), + 0 // Ignored. + ); + if (!ok) { + // Truly unexpected. We won't be able to observe IO on this Pipe. + debug(`Failed to associate IOCP: ${ctypes.winLastError}`); + } this.buffer = null; } - get event() { - if (this.pending.length) { - return this._event; - } - return null; + hasPendingIO() { + return !!this.pending.length; } maybeClose() {} @@ -96,7 +142,6 @@ class Pipe extends BasePipe { if (!this.closed) { this.handle.dispose(); - this._event.dispose(); io.pipes.delete(this.id); @@ -327,25 +372,6 @@ class OutputPipe extends Pipe { } } -class Signal { - constructor(event) { - this.event = event; - } - - cleanup() { - libc.CloseHandle(this.event); - this.event = null; - } - - onError() { - io.shutdown(); - } - - onReady() { - io.messageCount += 1; - } -} - class Process extends BaseProcess { constructor(...args) { super(...args); @@ -353,14 +379,6 @@ class Process extends BaseProcess { this.killed = false; } - /** - * Returns our process handle for use as an event in a WaitForMultipleObjects - * call. - */ - get event() { - return this.handle; - } - /** * Forcibly terminates the process. */ @@ -572,9 +590,29 @@ class Process extends BaseProcess { ctypes.cast(info.address(), ctypes.voidptr_t), info.constructor.size ); - errorMessage = `Failed to set job limits: 0x${( - ctypes.winLastError || 0 - ).toString(16)}`; + if (!ok) { + errorMessage = `Failed to set job limits: 0x${( + ctypes.winLastError || 0 + ).toString(16)}`; + } + } + + if (ok) { + let acp = win32.JOBOBJECT_ASSOCIATE_COMPLETION_PORT(); + acp.CompletionKey = win32.PVOID(IOCPKeyGen.keyForProcess(this)); + acp.CompletionPort = io.iocpCompletionPort; + + ok = libc.SetInformationJobObject( + this.jobHandle, + win32.JobObjectAssociateCompletionPortInformation, + ctypes.cast(acp.address(), ctypes.voidptr_t), + acp.constructor.size + ); + if (!ok) { + errorMessage = `Failed to set IOCP: 0x${( + ctypes.winLastError || 0 + ).toString(16)}`; + } } if (ok) { @@ -644,6 +682,9 @@ class Process extends BaseProcess { this.handle.dispose(); this.handle = null; + // This also terminates all child processes under this process, unless + // the child process was created with the CREATE_BREAKAWAY_FROM_JOB flag, + // in which case it would not be part of our job (and therefore survive). libc.TerminateJobObject(this.jobHandle, TERMINATE_EXIT_CODE); this.jobHandle.dispose(); this.jobHandle = null; @@ -660,8 +701,7 @@ class Process extends BaseProcess { } io = { - events: null, - eventHandlers: null, + iocpCompletionPort: null, pipes: new Map(), @@ -676,11 +716,12 @@ io = { init(details) { this.comspec = details.comspec; - let signalEvent = ctypes.cast( - ctypes.uintptr_t(details.signalEvent), + // Note: not wrapped in win32.Handle - parent thread is responsible for + // releasing the resource when this thread terminates. + this.iocpCompletionPort = ctypes.cast( + ctypes.uintptr_t(details.iocpCompletionPort), win32.HANDLE ); - this.signal = new Signal(signalEvent); this.updatePollEvents(); setTimeout(this.loop.bind(this), 0); @@ -690,9 +731,6 @@ io = { if (this.running) { this.running = false; - this.signal.cleanup(); - this.signal = null; - self.postMessage({ msg: "close" }); self.close(); } @@ -719,28 +757,29 @@ io = { }, updatePollEvents() { - let handlers = [ - this.signal, - ...this.pipes.values(), - ...this.processes.values(), - ]; - - handlers = handlers.filter(handler => handler.event); + let shouldPoll = false; + if (this.processes.size) { + // As long as the process is alive, it may notify IOCP. + // When the process exits, we'll remove it from io.processes. + shouldPoll = true; + } else { + for (let pipe of this.pipes.values()) { + if (pipe.hasPendingIO()) { + shouldPoll = true; + break; + } + } + } // Our poll loop is only useful if we've got at least 1 thing to poll other than our own // signal. - if (handlers.length == 1) { + if (!shouldPoll) { this.polling = false; } else if (!this.polling && this.running) { // Restart the poll loop if necessary: setTimeout(this.loop.bind(this), 0); this.polling = true; } - - this.eventHandlers = handlers; - - let handles = handlers.map(handler => handler.event); - this.events = win32.HANDLE.array()(handles); }, loop() { @@ -751,28 +790,108 @@ io = { }, poll() { + // On the first call, wait until any IO signaled. After that, immediately + // process all other IO that have signaled in the meantime. let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT; for (; ; timeout = 0) { - let events = this.events; - let handlers = this.eventHandlers; - - let result = libc.WaitForMultipleObjects( - events.length, - events, - false, + let numberOfBytesTransferred = win32.DWORD(); + let completionKeyOut = win32.ULONG_PTR(); + let lpOverlapped = win32.OVERLAPPED.ptr(0); + let ok = libc.GetQueuedCompletionStatus( + io.iocpCompletionPort, + numberOfBytesTransferred.address(), + completionKeyOut.address(), + lpOverlapped.address(), timeout ); - if (result < handlers.length) { + const deqWinErr = ok ? 0 : ctypes.winLastError; + if (!ok) { + if (deqWinErr === win32.WAIT_TIMEOUT) { + // No changes, return (caller may schedule another loop/poll). + break; + } + if (deqWinErr === win32.ERROR_ABANDONED_WAIT_0) { + // iocpCompletionPort was closed. + io.shutdown(); + break; + } + if (lpOverlapped.isNull()) { + // "If *lpOverlapped is NULL, the function did not dequeue a + // completion packet from the completion port." + // No remaining data, return (caller may schedule another loop/poll). + break; + } + // Received completion packet that failed, fall through. + } + let completionKey = parseInt(completionKeyOut.value, 10); + if (completionKey === win32.IOCP_COMPLETION_KEY_WAKE_WORKER) { + // Custom notification from the parent thread. + io.messageCount += 1; + // Continue to process any completed IO, then return (and eventually + // yield to the event loop to allow onmessage to receive messages). + continue; + } + if (IOCPKeyGen.isPipeKey(completionKey)) { + const pipeId = IOCPKeyGen.pipeIdFromKey(completionKey); + const pipe = io.pipes.get(pipeId); + if (!pipe) { + debug(`IOCP notification for unknown pipe: ${pipeId}`); + continue; + } + if (deqWinErr === win32.ERROR_BROKEN_PIPE) { + pipe.onError(); + continue; + } try { - handlers[result].onReady(); + pipe.onReady(); } catch (e) { console.error(e); debug(`Worker error: ${e} :: ${e.stack}`); - handlers[result].onError(); + pipe.onError(); + } + } else if (IOCPKeyGen.isProcessKey(completionKey)) { + // This is a notification via JOBOBJECT_ASSOCIATE_COMPLETION_PORT. + // "numberOfBytesTransferred" is any of the JOB_OBJECT_MSG_* values. + // https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_associate_completion_port + const jobMsgId = numberOfBytesTransferred.value; + const processId = IOCPKeyGen.processIdFromKey(completionKey); + + // The job reaching process count zero is a very strong indication that + // the process has exit. We also listen to the other *EXIT_PROCESS + // messages in case the process spawned child processes under the job, + // because that would suppress the JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO + // notification and prevent us from detecting process termination. + const isExit = + jobMsgId === win32.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS || + jobMsgId === win32.JOB_OBJECT_MSG_EXIT_PROCESS; + const isJobZero = jobMsgId === win32.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO; + if (!isExit && !isJobZero) { + // Ignore non-exit notifications, such as additional new processes. + continue; + } + const process = io.processes.get(processId); + if (!process) { + // This may happen when isJobZero == true, because we could have + // observed isExit == true before. + continue; + } + if (isExit) { + let realPid = ctypes.cast(lpOverlapped, win32.DWORD).value; + if (process.pid !== realPid) { + // A random child process (spawned by the process) has exited. + continue; + } + } + try { + process.onReady(); + } catch (e) { + // This is really unexpected, but don't break the poll loop. + console.error(e); + debug(`Worker error: ${e} :: ${e.stack}`); } } else { - break; + debug(`Unexpected IOCP CompletionKey: ${completionKey}`); } } }, diff --git a/toolkit/modules/subprocess/test/xpcshell/data_test_child.py b/toolkit/modules/subprocess/test/xpcshell/data_test_child.py new file mode 100644 index 000000000000..8464fb515179 --- /dev/null +++ b/toolkit/modules/subprocess/test/xpcshell/data_test_child.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 + +# This file is run by test_subprocess_child.js. To test this file in isolation: +# python3 -u toolkit/modules/subprocess/test/xpcshell/data_test_child.py spawn_child_and_exit +# Then separately, to the displayed URL "Listening at http://127.0.0.1:12345", +# with 12345 being a random port, +# curl http://127.0.0.1:12345 -X DELETE # request exit of parent +# curl http://127.0.0.1:12345 # request exit of child + +import sys +import time + + +def sleep_for_a_little_bit(): + time.sleep(0.2) + + +def spawn_child_and_exit(is_breakaway_job): + """ + Spawns and exits child processes to allow tests to verify that they detect + specifically the exit of this (parent) process. + + The expected sequence of outputs is as follows: + 1. parent_start + 2. first_child_start_and_exit + 3. parent_after_first_child_exit + 4. spawned_child_start + 5. Listening at http://127.0.0.1:12345 - with 12345 being random port + 6. child_received_http_request - DELETE request from test. + 7. data_from_child:kill_parent + 8. parent_exit + ( now the parent has exit) + ( child_process_still_alive_1 response sent to request from step 6) + ( wait for new request from client to request child to exit ) + ( child_process_still_alive_2 response sent to that new request ) + 9. spawned_child_exit + """ + import subprocess + + print("1. parent_start", flush=True) + + # Start and exit a child process (used to make sure that we do not + # mistakenly detect an exited child process for the parent process). + subprocess.run( + [sys.executable, "-c", "print('2. first_child_start_and_exit')"], + stdout=sys.stdout, + stderr=sys.stderr, + ) + # Wait a bit to make sure that the child's exit signal has been processed. + # This is not strictly needed, because we don't expect the child to affect + # the parent, but in case of a flawed implementation, this would enable the + # test to detect a bad implementation (by observing the exit before the + # "parent_after_first_child_exit" message below). + sleep_for_a_little_bit() + print("3. parent_after_first_child_exit", flush=True) + + creationflags = 0 + if is_breakaway_job: + # See comment in test_subprocess_child.js; in short we need this flag + # to make sure that the child outlives the parent when the subprocess + # implementation calls TerminateJobObject. + creationflags = subprocess.CREATE_BREAKAWAY_FROM_JOB + child_proc = subprocess.Popen( + [sys.executable, "-u", __file__, "spawned_child"], + creationflags=creationflags, + # We don't need this pipe, but when this side of the process exits, + # the pipe is closed, which the child can use to detect that the parent + # has exit. + stdin=subprocess.PIPE, + # We are using stdout as a control channel to allow the child to + # notify us when the process is done. + stdout=subprocess.PIPE, + # stderr is redirected to the real stdout, so that the caller can + # still observe print() from spawned_child. + stderr=sys.stdout, + ) + + # This blocks until the child has notified us. + data_from_child = child_proc.stdout.readline().decode().rstrip() + print(f"7. data_from_child:{data_from_child}", flush=True) + print("8. parent_exit", flush=True) + + # Wait a little bit to make sure that stdout has been flushed (and read by + # the caller) when the process exits. + sleep_for_a_little_bit() + sys.exit(0) + + +def spawned_child(): + import http.server + import socketserver + + def print_to_parent_stdout(msg): + # The parent maps our stderr to its stdout. + print(msg, flush=True, file=sys.stderr) + + # This is spawned via spawn_child_and_exit. + print_to_parent_stdout("4. spawned_child_start") + + class RequestHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, *args): + pass # Disable logging + + def do_DELETE(self): + print_to_parent_stdout("6. child_received_http_request") + # Let the caller know that we are responsive. + self.send_response(200) + self.send_header("Connection", "close") + self.end_headers() + + # Wait a little bit to allow the network request to be + # processed by the client. If for some reason the termination + # of the parent also kills the child, then at least the client + # has had a chance to become aware of it. + sleep_for_a_little_bit() + + # Now ask the parent to exit, and continue here. + print("kill_parent", flush=True) + # When the parent exits, stdin closes, which we detect here: + res = sys.stdin.read(1) + if len(res): + print_to_parent_stdout("spawned_child_UNEXPECTED_STDIN") + + # If we make it here, it means that this child outlived the + # parent, and we can let the client know. + # (if the child process is terminated prematurely, the client + # would also know through a disconnected socket). + self.wfile.write(b"child_process_still_alive_1") + + def do_GET(self): + self.send_response(200) + self.send_header("Connection", "close") + self.end_headers() + self.wfile.write(b"child_process_still_alive_2") + + # Starts a server that handles two requests and then closes the server. + with socketserver.TCPServer(("127.0.0.1", 0), RequestHandler) as server: + host, port = server.server_address[:2] + print_to_parent_stdout(f"5. Listening at http://{host}:{port}") + + # Expecting DELETE request (do_DELETE) + server.handle_request() + + # Expecting GET request (do_GET) + server.handle_request() + + print_to_parent_stdout("9. spawned_child_exit") + sys.exit(0) + + +cmd = sys.argv[1] +if cmd == "spawn_child_and_exit": + spawn_child_and_exit(is_breakaway_job=False) +elif cmd == "spawn_child_in_breakaway_job_and_exit": + spawn_child_and_exit(is_breakaway_job=True) +elif cmd == "spawned_child": + spawned_child() +else: + raise Exception(f"Unknown command: {cmd}") diff --git a/toolkit/modules/subprocess/test/xpcshell/test_subprocess_child.js b/toolkit/modules/subprocess/test/xpcshell/test_subprocess_child.js new file mode 100644 index 000000000000..636a63e4fc5c --- /dev/null +++ b/toolkit/modules/subprocess/test/xpcshell/test_subprocess_child.js @@ -0,0 +1,203 @@ +"use strict"; + +const { TestUtils } = ChromeUtils.importESModule( + "resource://testing-common/TestUtils.sys.mjs" +); +const { setTimeout } = ChromeUtils.importESModule( + "resource://gre/modules/Timer.sys.mjs" +); + +let PYTHON; + +add_setup(async () => { + PYTHON = await Subprocess.pathSearch(Services.env.get("PYTHON")); +}); + +// Send a request to the test server. We expect a non-empty response. Any error +// is caught and returned as an empty string. +async function fetchResponseText(url, method) { + const timeoutCtrl = new AbortController(); + + // We expect the (localhost) server to respond soon. If the server is not + // listening, we will eventually time out. Let's have a generous deadlines: + // - DELETE is given plenty of time, because the request is expected to + // always be accepted, followed by IPC and process termination. + // - GET is given a short deadline, because we expect either an immediate + // response (server is around) or no response (server was closed). + const timeout = method === "DELETE" ? 10000 : 1000; + // eslint-disable-next-line mozilla/no-arbitrary-setTimeout + setTimeout(() => timeoutCtrl.abort(), timeout); + // (^ AbortSignal.timeout() would be nicer but cannot be used, because it + // throws: "The current global does not support timeout".) + + try { + let res = await fetch(url, { method, signal: timeoutCtrl.signal }); + return await res.text(); + } catch (e) { + info(`fetch() request to kill parent failed: ${e}`); + if ( + e.name !== "AbortError" && + e.message !== "NetworkError when attempting to fetch resource." + ) { + ok(false, `Unexpected error: ${e}`); + } + return ""; + } +} + +async function do_test_spawn_parent_with_child(isBreakAwayJob) { + let testCmd; + if (isBreakAwayJob) { + Assert.equal(AppConstants.platform, "win", "Breakaway is Windows-only"); + testCmd = "spawn_child_in_breakaway_job_and_exit"; + } else { + testCmd = "spawn_child_and_exit"; + } + const TEST_SCRIPT = do_get_file("data_test_child.py").path; + info(`Launching proc: ${PYTHON} -u ${TEST_SCRIPT} ${testCmd}`); + const proc = await Subprocess.call({ + command: PYTHON, + arguments: ["-u", TEST_SCRIPT, testCmd], + }); + const exitPromise = proc.wait(); + let exited = false; + exitPromise.then(() => { + exited = true; + }); + + info(`Spawned process with pid ${proc.pid}, waiting for child`); + + // We'll accumulate stdout, and reset what we've received so far after + // checking that the content matches with our expectations. + let stdoutText = ""; + let stdoutDonePromise = (async () => { + let seenParentExit = false; + for (let s; (s = await proc.stdout.readString()); ) { + // On Windows, print() uses \r\n instead of \n. Drop it. + s = s.replaceAll("\r", ""); + stdoutText += s; + dump(`Received stdout from test script: ${s}\n`); + + seenParentExit ||= stdoutText.includes("parent_exit"); + if (!seenParentExit) { + Assert.ok(!exited, "Process should not have exited yet"); + } + } + })(); + + const EXPECTED_STDOUT_UNTIL_LISTENING = `\ +1. parent_start +2. first_child_start_and_exit +3. parent_after_first_child_exit +4. spawned_child_start +5. Listening at `; // followed by http://127.0.0.1:\n + const EXPECTED_STDOUT_AT_PARENT_EXIT = `\ +6. child_received_http_request +7. data_from_child:kill_parent +8. parent_exit +`; + const EXPECTED_STDOUT_AT_CHILD_EXIT = `\ +9. spawned_child_exit +`; + + await TestUtils.waitForCondition( + () => stdoutText.startsWith(EXPECTED_STDOUT_UNTIL_LISTENING), + "Waiting for (parent) process to start child with listening HTTP server" + ); + + const url = stdoutText.replace(EXPECTED_STDOUT_UNTIL_LISTENING, "").trim(); + ok(/^http:\/\/127\.0\.0\.1:\d+$/.test(url), `Found server URL: ${url}`); + stdoutText = ""; // Reset now that we have confirmed its content. + + // When the server receives this request, it triggers exit of the process. + const promiseResponseToExitParent = fetchResponseText(url, "DELETE"); + + info("Waiting for spawned (parent) process to exit"); + const { exitCode } = await exitPromise; + equal(exitCode, 0, "Got expected exit code"); + equal(stdoutText, EXPECTED_STDOUT_AT_PARENT_EXIT, "stdout before exit"); + stdoutText = ""; // Reset again after checking its content. + + // Now two things can happen: + // - Either the child lives on, and we can send a request to it to ask the + // child to exit as well. + // - Or the process tree rooted at the parent dies, and the child is taken + // with it. The initial fetch() might succeed or reject. A new fetch() is + // most likely going to reject. + // On Linux and macOS, the child appears to live on. + // On Windows, the whole process tree dies, when run under this test + // (but somehow not when the Python script is run in isolation...?). + + const responseToExitParent = await promiseResponseToExitParent; + if (!responseToExitParent) { + info("fetch() request to kill parent failed"); + } + + info("Checking whether the child process is still alive..."); + const responseToChildAlive = await fetchResponseText(url, "GET"); + const wasChildAlive = !!responseToChildAlive; + + if (wasChildAlive) { + // Mainly a sanity check: If the second request gots through, then the + // first request should have received the expected response. + equal(responseToExitParent, "child_process_still_alive_1", "Still alive 1"); + equal(responseToChildAlive, "child_process_still_alive_2", "Still alive 2"); + } else { + info("fetch() request to check child liveness failed"); + } + + if (AppConstants.platform === "win" && !isBreakAwayJob) { + ok(!wasChildAlive, "Child process exits when the parent exits"); + } else { + ok(wasChildAlive, "Child process outlives parent"); + } + + await stdoutDonePromise; + + // On Windows, we close the pipes as soon as the parent process was detected + // to have exited. This prevents the child's write to be read. + if (wasChildAlive && AppConstants.platform !== "win") { + equal(stdoutText, EXPECTED_STDOUT_AT_CHILD_EXIT, "Stdout from child"); + } else { + equal(stdoutText, "", "No more stdout after parent exited (with child)"); + } +} + +// Tests spawning a process that exits after spawning a child process, and +// verify that the parent process is detected as exited, while the child +// process is still running. +add_task(async function test_spawn_child_outliving_parent_process() { + // Our subprocess implementation (subprocess_win.worker.js) puts the spawned + // process in a job, and cleans up that job upon process exit by calling + // TerminateJobObject. This causes all other (child) processes that are part + // of the job to terminate. To make sure that the child process outlives the + // parent, we have to launch it with the CREATE_BREAKAWAY_FROM_JOB flag. + const isBreakAwayJob = AppConstants.platform == "win"; + await do_test_spawn_parent_with_child(isBreakAwayJob); +}); + +// On Windows, child processes are terminated along with the parent by default, +// because our subprocess implementation puts the process in a Job, and the +// TerminateJobObject call terminates all associated child processes. This +// test confirms the default behavior, as opposed to what we see in +// test_spawn_child_outliving_parent_process. +add_task( + { skip_if: () => AppConstants.platform != "win" }, + async function test_child_terminated_on_process_termination() { + await do_test_spawn_parent_with_child(false); + } +); + +add_task(async function test_cleanup() { + let { getSubprocessImplForTest } = ChromeUtils.importESModule( + "resource://gre/modules/Subprocess.sys.mjs" + ); + + let worker = getSubprocessImplForTest().Process.getWorker(); + + let openFiles = await worker.call("getOpenFiles", []); + let processes = await worker.call("getProcesses", []); + + equal(openFiles.size, 0, "No remaining open files"); + equal(processes.size, 0, "No remaining processes"); +}); diff --git a/toolkit/modules/subprocess/test/xpcshell/xpcshell.toml b/toolkit/modules/subprocess/test/xpcshell/xpcshell.toml index 60c16028364f..c0f67d9afb2b 100644 --- a/toolkit/modules/subprocess/test/xpcshell/xpcshell.toml +++ b/toolkit/modules/subprocess/test/xpcshell/xpcshell.toml @@ -5,6 +5,7 @@ skip-if = ["os == 'android'"] subprocess = true support-files = [ "data_text_file.txt", + "data_test_child.py", "data_test_script.py", ] @@ -15,6 +16,8 @@ skip-if = [ ] run-sequentially = "very high failure rate in parallel" +["test_subprocess_child.js"] + ["test_subprocess_connectRunning.js"] skip-if = [ "os == 'mac' && os_version == '11.20' && arch == 'aarch64'", # Bug 1936343