diff options
Diffstat (limited to 'toolkit/modules/subprocess/subprocess_worker_unix.js')
-rw-r--r-- | toolkit/modules/subprocess/subprocess_worker_unix.js | 612 |
1 files changed, 612 insertions, 0 deletions
diff --git a/toolkit/modules/subprocess/subprocess_worker_unix.js b/toolkit/modules/subprocess/subprocess_worker_unix.js new file mode 100644 index 0000000000..74df22f349 --- /dev/null +++ b/toolkit/modules/subprocess/subprocess_worker_unix.js @@ -0,0 +1,612 @@ +/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */ +/* vim: set sts=2 sw=2 et tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +/* eslint-env mozilla/chrome-worker */ +/* exported Process */ + +/* import-globals-from subprocess_shared.js */ +/* import-globals-from subprocess_shared_unix.js */ +/* import-globals-from subprocess_worker_common.js */ +importScripts( + "resource://gre/modules/subprocess/subprocess_shared.js", + "resource://gre/modules/subprocess/subprocess_shared_unix.js", + "resource://gre/modules/subprocess/subprocess_worker_common.js" +); + +const POLL_TIMEOUT = 5000; + +let io; + +let nextPipeId = 0; + +class Pipe extends BasePipe { + constructor(process, fd) { + super(); + + this.process = process; + this.fd = fd; + this.id = nextPipeId++; + } + + get pollEvents() { + throw new Error("Not implemented"); + } + + /** + * Closes the file descriptor. + * + * @param {boolean} [force=false] + * If true, the file descriptor is closed immediately. If false, the + * file descriptor is closed after all current pending IO operations + * have completed. + * + * @returns {Promise<void>} + * Resolves when the file descriptor has been closed. + */ + close(force = false) { + if (!force && this.pending.length) { + this.closing = true; + return this.closedPromise; + } + + for (let { reject } of this.pending) { + let error = new Error("File closed"); + error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; + reject(error); + } + this.pending.length = 0; + + if (!this.closed) { + this.fd.dispose(); + + this.closed = true; + this.resolveClosed(); + + io.pipes.delete(this.id); + io.updatePollFds(); + } + return this.closedPromise; + } + + /** + * Called when an error occurred while polling our file descriptor. + */ + onError() { + this.close(true); + this.process.wait(); + } +} + +class InputPipe extends Pipe { + /** + * A bit mask of poll() events which we currently wish to be notified of on + * this file descriptor. + */ + get pollEvents() { + if (this.pending.length) { + return LIBC.POLLIN; + } + return 0; + } + + /** + * Asynchronously reads at most `length` bytes of binary data from the file + * descriptor into an ArrayBuffer of the same size. Returns a promise which + * resolves when the operation is complete. + * + * @param {integer} length + * The number of bytes to read. + * + * @returns {Promise<ArrayBuffer>} + */ + read(length) { + if (this.closing || this.closed) { + throw new Error("Attempt to read from closed pipe"); + } + + return new Promise((resolve, reject) => { + this.pending.push({ resolve, reject, length }); + io.updatePollFds(); + }); + } + + /** + * Synchronously reads at most `count` bytes of binary data into an + * ArrayBuffer, and returns that buffer. If no data can be read without + * blocking, returns null instead. + * + * @param {integer} count + * The number of bytes to read. + * + * @returns {ArrayBuffer|null} + */ + readBuffer(count) { + let buffer = new ArrayBuffer(count); + + let read = +libc.read(this.fd, buffer, buffer.byteLength); + if (read < 0 && ctypes.errno != LIBC.EAGAIN) { + this.onError(); + } + + if (read <= 0) { + return null; + } + + if (read < buffer.byteLength) { + return ArrayBuffer_transfer(buffer, read); + } + + return buffer; + } + + /** + * Called when one of the IO operations matching the `pollEvents` mask may be + * performed without blocking. + * + * @returns {boolean} + * True if any data was successfully read. + */ + onReady() { + let result = false; + let reads = this.pending; + while (reads.length) { + let { resolve, length } = reads[0]; + + let buffer = this.readBuffer(length); + if (buffer) { + result = true; + this.shiftPending(); + resolve(buffer); + } else { + break; + } + } + + if (!reads.length) { + io.updatePollFds(); + } + return result; + } +} + +class OutputPipe extends Pipe { + /** + * A bit mask of poll() events which we currently wish to be notified of on + * this file discriptor. + */ + get pollEvents() { + if (this.pending.length) { + return LIBC.POLLOUT; + } + return 0; + } + + /** + * Asynchronously writes the given buffer to our file descriptor, and returns + * a promise which resolves when the operation is complete. + * + * @param {ArrayBuffer} buffer + * The buffer to write. + * + * @returns {Promise<integer>} + * Resolves to the number of bytes written when the operation is + * complete. + */ + write(buffer) { + if (this.closing || this.closed) { + throw new Error("Attempt to write to closed pipe"); + } + + return new Promise((resolve, reject) => { + this.pending.push({ resolve, reject, buffer, length: buffer.byteLength }); + io.updatePollFds(); + }); + } + + /** + * Attempts to synchronously write the given buffer to our file descriptor. + * Writes only as many bytes as can be written without blocking, and returns + * the number of byes successfully written. + * + * Closes the file descriptor if an IO error occurs. + * + * @param {ArrayBuffer} buffer + * The buffer to write. + * + * @returns {integer} + * The number of bytes successfully written. + */ + writeBuffer(buffer) { + let bytesWritten = libc.write(this.fd, buffer, buffer.byteLength); + + if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) { + this.onError(); + } + + return bytesWritten; + } + + /** + * Called when one of the IO operations matching the `pollEvents` mask may be + * performed without blocking. + */ + onReady() { + let writes = this.pending; + while (writes.length) { + let { buffer, resolve, length } = writes[0]; + + let written = this.writeBuffer(buffer); + + if (written == buffer.byteLength) { + resolve(length); + this.shiftPending(); + } else if (written > 0) { + writes[0].buffer = buffer.slice(written); + } else { + break; + } + } + + if (!writes.length) { + io.updatePollFds(); + } + } +} + +class Signal { + constructor(fd) { + this.fd = fd; + } + + cleanup() { + libc.close(this.fd); + this.fd = null; + } + + get pollEvents() { + return LIBC.POLLIN; + } + + /** + * Called when an error occurred while polling our file descriptor. + */ + onError() { + io.shutdown(); + } + + /** + * Called when one of the IO operations matching the `pollEvents` mask may be + * performed without blocking. + */ + onReady() { + let buffer = new ArrayBuffer(16); + let count = +libc.read(this.fd, buffer, buffer.byteLength); + if (count > 0) { + io.messageCount += count; + } + } +} + +class Process extends BaseProcess { + /** + * Each Process object opens an additional pipe from the target object, which + * will be automatically closed when the process exits, but otherwise + * carries no data. + * + * This property contains a bit mask of poll() events which we wish to be + * notified of on this descriptor. We're not expecting any input from this + * pipe, but we need to poll for input until the process exits in order to be + * notified when the pipe closes. + */ + get pollEvents() { + if (this.exitCode === null) { + return LIBC.POLLIN; + } + return 0; + } + + /** + * Kills the process with the given signal. + * + * @param {integer} signal + */ + kill(signal) { + libc.kill(this.pid, signal); + this.wait(); + } + + /** + * Initializes the IO pipes for use as standard input, output, and error + * descriptors in the spawned process. + * + * @param {object} options + * The Subprocess options object for this process. + * @returns {unix.Fd[]} + * The array of file descriptors belonging to the spawned process. + */ + initPipes(options) { + let stderr = options.stderr; + + let our_pipes = []; + let their_pipes = new Map(); + + let pipe = input => { + let fds = ctypes.int.array(2)(); + + let res = libc.pipe(fds); + if (res == -1) { + throw new Error("Unable to create pipe"); + } + + fds = Array.from(fds, unix.Fd); + + if (input) { + fds.reverse(); + } + + if (input) { + our_pipes.push(new InputPipe(this, fds[1])); + } else { + our_pipes.push(new OutputPipe(this, fds[1])); + } + + libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC); + libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC); + libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK); + + return fds[0]; + }; + + their_pipes.set(0, pipe(false)); + their_pipes.set(1, pipe(true)); + + if (stderr == "pipe") { + their_pipes.set(2, pipe(true)); + } else if (stderr == "stdout") { + their_pipes.set(2, their_pipes.get(1)); + } + + // Create an additional pipe that we can use to monitor for process exit. + their_pipes.set(3, pipe(true)); + this.fd = our_pipes.pop().fd; + + this.pipes = our_pipes; + + return their_pipes; + } + + spawn(options) { + let fds = this.initPipes(options); + + let launchOptions = { + environment: options.environment, + disclaim: options.disclaim, + fdMap: [], + }; + + // Check for truthiness to avoid chdir("null") + if (options.workdir) { + launchOptions.workdir = options.workdir; + } + + for (let [dst, src] of fds.entries()) { + launchOptions.fdMap.push({ src, dst }); + } + + try { + this.pid = IOUtils.launchProcess(options.arguments, launchOptions); + } finally { + for (let fd of new Set(fds.values())) { + fd.dispose(); + } + } + } + + /** + * Called when input is available on our sentinel file descriptor. + * + * @see pollEvents + */ + onReady() { + // We're not actually expecting any input on this pipe. If we get any, we + // can't poll the pipe any further without reading it. + if (this.wait() == undefined) { + this.kill(9); + } + } + + /** + * Called when an error occurred while polling our sentinel file descriptor. + * + * @see pollEvents + */ + onError() { + this.wait(); + } + + /** + * Attempts to wait for the process's exit status, without blocking. If + * successful, resolves the `exitPromise` to the process's exit value. + * + * @returns {integer|null} + * The process's exit status, if it has already exited. + */ + wait() { + if (this.exitCode !== null) { + return this.exitCode; + } + + let status = ctypes.int(); + + let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG); + // If there's a failure here and we get any errno other than EINTR, it + // means that the process has been reaped by another thread (most likely + // the nspr process wait thread), and its actual exit status is not + // available to us. In that case, we have to assume success. + if (res == 0 || (res == -1 && ctypes.errno == LIBC.EINTR)) { + return null; + } + + let sig = unix.WTERMSIG(status.value); + if (sig) { + this.exitCode = -sig; + } else { + this.exitCode = unix.WEXITSTATUS(status.value); + } + + this.fd.dispose(); + io.updatePollFds(); + this.resolveExit(this.exitCode); + return this.exitCode; + } +} + +io = { + pollFds: null, + pollHandlers: null, + + pipes: new Map(), + + processes: new Map(), + + messageCount: 0, + + running: true, + + polling: false, + + init(details) { + this.signal = new Signal(details.signalFd); + this.updatePollFds(); + + setTimeout(this.loop.bind(this), 0); + }, + + shutdown() { + if (this.running) { + this.running = false; + + this.signal.cleanup(); + this.signal = null; + + self.postMessage({ msg: "close" }); + self.close(); + } + }, + + getPipe(pipeId) { + let pipe = this.pipes.get(pipeId); + + if (!pipe) { + let error = new Error("File closed"); + error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; + throw error; + } + return pipe; + }, + + getProcess(processId) { + let process = this.processes.get(processId); + + if (!process) { + throw new Error(`Invalid process ID: ${processId}`); + } + return process; + }, + + updatePollFds() { + let handlers = [ + this.signal, + ...this.pipes.values(), + ...this.processes.values(), + ]; + + handlers = handlers.filter(handler => handler.pollEvents); + + // Our poll loop is only useful if we've got at least 1 thing to poll other than our own + // signal. + if (handlers.length == 1) { + this.polling = false; + } else if (!this.polling && this.running) { + // Restart the poll loop if necessary: + setTimeout(this.loop.bind(this), 0); + this.polling = true; + } + + let pollfds = unix.pollfd.array(handlers.length)(); + + for (let [i, handler] of handlers.entries()) { + let pollfd = pollfds[i]; + + pollfd.fd = handler.fd; + pollfd.events = handler.pollEvents; + pollfd.revents = 0; + } + + this.pollFds = pollfds; + this.pollHandlers = handlers; + }, + + loop() { + this.poll(); + if (this.running && this.polling) { + setTimeout(this.loop.bind(this), 0); + } + }, + + poll() { + let handlers = this.pollHandlers; + let pollfds = this.pollFds; + + let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT; + let count = libc.poll(pollfds, pollfds.length, timeout); + + for (let i = 0; count && i < pollfds.length; i++) { + let pollfd = pollfds[i]; + if (pollfd.revents) { + count--; + + let handler = handlers[i]; + try { + let success = false; + if (pollfd.revents & handler.pollEvents) { + success = handler.onReady(); + } + // Only call the error handler in this iteration if we didn't also + // have a success. This is necessary because Linux systems set POLLHUP + // on a pipe when it's closed but there's still buffered data to be + // read, and Darwin sets POLLIN and POLLHUP on a closed pipe, even + // when there's no data to be read. + if ( + !success && + pollfd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL) + ) { + handler.onError(); + } + } catch (e) { + console.error(e); + debug(`Worker error: ${e} :: ${e.stack}`); + handler.onError(); + } + + pollfd.revents = 0; + } + } + }, + + addProcess(process) { + this.processes.set(process.id, process); + + for (let pipe of process.pipes) { + this.pipes.set(pipe.id, pipe); + } + }, + + cleanupProcess(process) { + this.processes.delete(process.id); + }, +}; |