1
0
Fork 0
firefox/toolkit/modules/subprocess/subprocess_unix.worker.js
Daniel Baumann 5e9a113729
Adding upstream version 140.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-25 09:37:52 +02:00

701 lines
18 KiB
JavaScript

/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
/* vim: set sts=2 sw=2 et tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
/* exported Process, ManagedProcess */
/* import-globals-from subprocess_shared.js */
/* import-globals-from subprocess_shared_unix.js */
/* import-globals-from subprocess_worker_common.js */
importScripts(
"resource://gre/modules/subprocess/subprocess_shared.js",
"resource://gre/modules/subprocess/subprocess_shared_unix.js",
"resource://gre/modules/subprocess/subprocess_worker_common.js"
);
const POLL_TIMEOUT = 5000;
let io;
let nextPipeId = 0;
class Pipe extends BasePipe {
constructor(process, fd) {
super();
this.process = process;
this.fd = fd;
this.id = nextPipeId++;
}
get pollEvents() {
throw new Error("Not implemented");
}
/**
* Closes the file descriptor.
*
* @param {boolean} [force=false]
* If true, the file descriptor is closed immediately. If false, the
* file descriptor is closed after all current pending IO operations
* have completed.
*
* @returns {Promise<void>}
* Resolves when the file descriptor has been closed.
*/
close(force = false) {
if (!force && this.pending.length) {
this.closing = true;
return this.closedPromise;
}
for (let { reject } of this.pending) {
let error = new Error("File closed");
error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
reject(error);
}
this.pending.length = 0;
if (!this.closed) {
this.fd.dispose();
this.closed = true;
this.resolveClosed();
io.pipes.delete(this.id);
io.updatePollFds();
}
return this.closedPromise;
}
/**
* Called when an error occurred while polling our file descriptor.
*/
onError() {
this.close(true);
this.process.wait();
}
}
class InputPipe extends Pipe {
/**
* A bit mask of poll() events which we currently wish to be notified of on
* this file descriptor.
*/
get pollEvents() {
if (this.pending.length) {
return LIBC.POLLIN;
}
return 0;
}
/**
* Asynchronously reads at most `length` bytes of binary data from the file
* descriptor into an ArrayBuffer of the same size. Returns a promise which
* resolves when the operation is complete.
*
* @param {integer} length
* The number of bytes to read.
*
* @returns {Promise<ArrayBuffer>}
*/
read(length) {
if (this.closing || this.closed) {
throw new Error("Attempt to read from closed pipe");
}
return new Promise((resolve, reject) => {
this.pending.push({ resolve, reject, length });
io.updatePollFds();
});
}
/**
* Synchronously reads at most `count` bytes of binary data into an
* ArrayBuffer, and returns that buffer. If no data can be read without
* blocking, returns null instead.
*
* @param {integer} count
* The number of bytes to read.
*
* @returns {ArrayBuffer|null}
*/
readBuffer(count) {
let buffer = new ArrayBuffer(count);
let read = +libc.read(this.fd, buffer, buffer.byteLength);
if (read < 0 && ctypes.errno != LIBC.EAGAIN) {
this.onError();
}
if (read <= 0) {
return null;
}
if (read < buffer.byteLength) {
return ArrayBuffer_transfer(buffer, read);
}
return buffer;
}
/**
* Called when one of the IO operations matching the `pollEvents` mask may be
* performed without blocking.
*
* @returns {boolean}
* True if any data was successfully read.
*/
onReady() {
let result = false;
let reads = this.pending;
while (reads.length) {
let { resolve, length } = reads[0];
let buffer = this.readBuffer(length);
if (buffer) {
result = true;
this.shiftPending();
resolve(buffer);
} else {
break;
}
}
if (!reads.length) {
io.updatePollFds();
}
return result;
}
}
class OutputPipe extends Pipe {
/**
* A bit mask of poll() events which we currently wish to be notified of on
* this file discriptor.
*/
get pollEvents() {
if (this.pending.length) {
return LIBC.POLLOUT;
}
return 0;
}
/**
* Asynchronously writes the given buffer to our file descriptor, and returns
* a promise which resolves when the operation is complete.
*
* @param {ArrayBuffer} buffer
* The buffer to write.
*
* @returns {Promise<integer>}
* Resolves to the number of bytes written when the operation is
* complete.
*/
write(buffer) {
if (this.closing || this.closed) {
throw new Error("Attempt to write to closed pipe");
}
return new Promise((resolve, reject) => {
this.pending.push({ resolve, reject, buffer, length: buffer.byteLength });
io.updatePollFds();
});
}
/**
* Attempts to synchronously write the given buffer to our file descriptor.
* Writes only as many bytes as can be written without blocking, and returns
* the number of byes successfully written.
*
* Closes the file descriptor if an IO error occurs.
*
* @param {ArrayBuffer} buffer
* The buffer to write.
*
* @returns {integer}
* The number of bytes successfully written.
*/
writeBuffer(buffer) {
let bytesWritten = libc.write(this.fd, buffer, buffer.byteLength);
if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) {
this.onError();
}
return bytesWritten;
}
/**
* Called when one of the IO operations matching the `pollEvents` mask may be
* performed without blocking.
*/
onReady() {
let writes = this.pending;
while (writes.length) {
let { buffer, resolve, length } = writes[0];
let written = this.writeBuffer(buffer);
if (written == buffer.byteLength) {
resolve(length);
this.shiftPending();
} else if (written > 0) {
writes[0].buffer = buffer.slice(written);
} else {
break;
}
}
if (!writes.length) {
io.updatePollFds();
}
}
}
class Signal {
constructor(fd) {
this.fd = fd;
}
cleanup() {
libc.close(this.fd);
this.fd = null;
}
get pollEvents() {
return LIBC.POLLIN;
}
/**
* Called when an error occurred while polling our file descriptor.
*/
onError() {
io.shutdown();
}
/**
* Called when one of the IO operations matching the `pollEvents` mask may be
* performed without blocking.
*/
onReady() {
let buffer = new ArrayBuffer(16);
let count = +libc.read(this.fd, buffer, buffer.byteLength);
if (count > 0) {
io.messageCount += count;
}
}
}
class Process extends BaseProcess {
/**
* Each Process object opens an additional pipe from the target object, which
* will be automatically closed when the process exits, but otherwise
* carries no data.
*
* This property contains a bit mask of poll() events which we wish to be
* notified of on this descriptor. We're not expecting any input from this
* pipe, but we need to poll for input until the process exits in order to be
* notified when the pipe closes.
*/
get pollEvents() {
if (this.exitCode === null) {
return LIBC.POLLIN;
}
return 0;
}
/**
* Kills the process with the given signal.
*
* @param {integer} signal
*/
kill(signal) {
libc.kill(this.pid, signal);
this.wait();
}
/**
* Initializes the IO pipes for use as standard input, output, and error
* descriptors in the spawned process.
*
* @param {object} options
* The Subprocess options object for this process.
* @returns {unix.Fd[]}
* The array of file descriptors belonging to the spawned process.
*/
initPipes(options) {
let stderr = options.stderr;
let our_pipes = [];
let their_pipes = new Map();
let pipe = input => {
let fds = ctypes.int.array(2)();
let res = libc.pipe(fds);
if (res == -1) {
throw new Error("Unable to create pipe");
}
fds = Array.from(fds, unix.Fd);
if (input) {
fds.reverse();
}
if (input) {
our_pipes.push(new InputPipe(this, fds[1]));
} else {
our_pipes.push(new OutputPipe(this, fds[1]));
}
libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK);
return fds[0];
};
their_pipes.set(0, pipe(false));
their_pipes.set(1, pipe(true));
if (stderr == "pipe") {
their_pipes.set(2, pipe(true));
} else if (stderr == "stdout") {
their_pipes.set(2, their_pipes.get(1));
}
// Create an additional pipe that we can use to monitor for process exit.
their_pipes.set(3, pipe(true));
this.fd = our_pipes.pop().fd;
this.pipes = our_pipes;
return their_pipes;
}
spawn(options) {
let fds = this.initPipes(options);
let launchOptions = {
environment: options.environment,
disclaim: options.disclaim,
fdMap: [],
};
// Check for truthiness to avoid chdir("null")
if (options.workdir) {
launchOptions.workdir = options.workdir;
}
for (let [dst, src] of fds.entries()) {
launchOptions.fdMap.push({ src, dst });
}
try {
this.pid = IOUtils.launchProcess(options.arguments, launchOptions);
} finally {
for (let fd of new Set(fds.values())) {
fd.dispose();
}
}
}
/**
* Called when input is available on our sentinel file descriptor.
*
* @see pollEvents
*/
onReady() {
// We're not actually expecting any input on this pipe. If we get any, we
// can't poll the pipe any further without reading it.
if (this.wait() == undefined) {
this.kill(9);
}
}
/**
* Called when an error occurred while polling our sentinel file descriptor.
*
* @see pollEvents
*/
onError() {
this.wait();
}
/**
* Attempts to wait for the process's exit status, without blocking. If
* successful, resolves the `exitPromise` to the process's exit value.
*
* @returns {integer|null}
* The process's exit status, if it has already exited.
*/
wait() {
if (this.exitCode !== null) {
return this.exitCode;
}
let status = ctypes.int();
let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG);
// If there's a failure here and we get any errno other than EINTR, it
// means that the process has been reaped by another thread (most likely
// the nspr process wait thread), and its actual exit status is not
// available to us. In that case, we have to assume success.
if (res == 0 || (res == -1 && ctypes.errno == LIBC.EINTR)) {
return null;
}
let sig = unix.WTERMSIG(status.value);
if (sig) {
this.exitCode = -sig;
} else {
this.exitCode = unix.WEXITSTATUS(status.value);
}
this.fd.dispose();
io.updatePollFds();
this.resolveExit(this.exitCode);
return this.exitCode;
}
}
/*
* Wrapping file descriptors of already-running process to allow interacting
* with the process via the Subprocess module.
*
* This is used e.g. by NativeMessaging code when interfacing with XDG
* WebExtensions portal (required when running under Flatpak or Snap). The
* actual portal binary is executed outside and file descriptors will be shared
* over DBus so interaction can happen with the portal.
*
* The file descriptors are wrapped in a unix.Fd() to ensure proper cleanup as
* long as the ManagedProcess instance is properly disposed off: kill() should
* ensure this is the case.
*
* All the file descriptors will be monitored by poll() to try and detect
* process termination.
* */
class ManagedProcess extends BaseProcess {
/*
* Connect to an already running process that was spawned externally,
* through numeric stdin/stdout/stderr file descriptors.
*
* @param {number[]} receivedFDs
* An array of file descriptors (stdin, stdout and stderr).
*/
connectRunning(receivedFDs) {
const fdCheck = fds => {
for (let value of io.pipes.values()) {
const fd = parseInt(value.fd.toString(), 10);
return fd === fds[0] || fd === fds[1] || fd === fds[2];
}
};
const alreadyUsed = fdCheck(receivedFDs);
if (alreadyUsed) {
throw new Error("Attempt to connect FDs already handled by Subprocess");
}
this.pipes.push(new OutputPipe(this, unix.Fd(receivedFDs[0])));
this.pipes.push(new InputPipe(this, unix.Fd(receivedFDs[1])));
this.pipes.push(new InputPipe(this, unix.Fd(receivedFDs[2])));
}
get pollEvents() {
// No poll fd here: we don't have a handle to the process, only its fd.
// Each fd of this.pipes is already polled by updatePollFds, and their
// Pipe's onError() method calls our wait() method when the fd is closed.
// We assume that the process has exited when all pipes have closed.
// ManagedProcess does not have onReady() or onError() definitions because
// updatePollFds() does not call these when pollEvents is 0.
return 0;
}
/*
* Termination of the ManagedProcess: such a process is started/stopped
* outside of the browser, the termination here ensures that all its pipes are
* properly closed and that any code waiting for the process to terminate is
* unblocked by resolving the Promise.
* */
kill() {
this.pipes.forEach(p => p.close());
this.resolveExit(this.exitCode);
}
/* A ManagedProcess being ran outside of our PID namespace (Snap/Flatpak)
* there is no realistic way to waitpid() here.
* As noted in pollEvents, we consider the process closed if all of its fd
* have closed.
* */
wait() {
if (this.pipes.every(pipe => pipe.closed)) {
// Actual exitCode is unknown, just return null.
this.resolveExit(null);
} else {
io.updatePollFds();
}
}
/*
* A ManagedProcess is already running, so here the spawn just performs the
* connection of the file descriptors received.
*
* @param {array} options
* An array of file descriptors from an existing process.
* */
spawn(options) {
return this.connectRunning(options);
}
}
io = {
pollFds: null,
pollHandlers: null,
pipes: new Map(),
processes: new Map(),
messageCount: 0,
running: true,
polling: false,
init(details) {
this.signal = new Signal(details.signalFd);
this.updatePollFds();
setTimeout(this.loop.bind(this), 0);
},
shutdown() {
if (this.running) {
this.running = false;
this.signal.cleanup();
this.signal = null;
self.postMessage({ msg: "close" });
self.close();
}
},
getPipe(pipeId) {
let pipe = this.pipes.get(pipeId);
if (!pipe) {
let error = new Error("File closed");
error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
throw error;
}
return pipe;
},
getProcess(processId) {
let process = this.processes.get(processId);
if (!process) {
throw new Error(`Invalid process ID: ${processId}`);
}
return process;
},
updatePollFds() {
let handlers = [
this.signal,
...this.pipes.values(),
...this.processes.values(),
];
handlers = handlers.filter(handler => handler.pollEvents);
// Our poll loop is only useful if we've got at least 1 thing to poll other than our own
// signal.
if (handlers.length == 1) {
this.polling = false;
} else if (!this.polling && this.running) {
// Restart the poll loop if necessary:
setTimeout(this.loop.bind(this), 0);
this.polling = true;
}
let pollfds = unix.pollfd.array(handlers.length)();
for (let [i, handler] of handlers.entries()) {
let pollfd = pollfds[i];
pollfd.fd = handler.fd;
pollfd.events = handler.pollEvents;
pollfd.revents = 0;
}
this.pollFds = pollfds;
this.pollHandlers = handlers;
},
loop() {
this.poll();
if (this.running && this.polling) {
setTimeout(this.loop.bind(this), 0);
}
},
poll() {
let handlers = this.pollHandlers;
let pollfds = this.pollFds;
let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
let count = libc.poll(pollfds, pollfds.length, timeout);
for (let i = 0; count && i < pollfds.length; i++) {
let pollfd = pollfds[i];
if (pollfd.revents) {
count--;
let handler = handlers[i];
try {
let success = false;
if (pollfd.revents & handler.pollEvents) {
success = handler.onReady();
}
// Only call the error handler in this iteration if we didn't also
// have a success. This is necessary because Linux systems set POLLHUP
// on a pipe when it's closed but there's still buffered data to be
// read, and Darwin sets POLLIN and POLLHUP on a closed pipe, even
// when there's no data to be read.
if (
!success &&
pollfd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL)
) {
handler.onError();
}
} catch (e) {
console.error(e);
debug(`Worker error: ${e} :: ${e.stack}`);
handler.onError();
}
pollfd.revents = 0;
}
}
},
addProcess(process) {
this.processes.set(process.id, process);
for (let pipe of process.pipes) {
this.pipes.set(pipe.id, pipe);
}
},
cleanupProcess(process) {
this.processes.delete(process.id);
},
};