/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */ /* vim: set sts=2 sw=2 et tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ "use strict"; /* eslint-env mozilla/chrome-worker */ /* exported Process */ /* import-globals-from subprocess_shared.js */ /* import-globals-from subprocess_shared_unix.js */ /* import-globals-from subprocess_worker_common.js */ importScripts( "resource://gre/modules/subprocess/subprocess_shared.js", "resource://gre/modules/subprocess/subprocess_shared_unix.js", "resource://gre/modules/subprocess/subprocess_worker_common.js" ); const POLL_TIMEOUT = 5000; let io; let nextPipeId = 0; class Pipe extends BasePipe { constructor(process, fd) { super(); this.process = process; this.fd = fd; this.id = nextPipeId++; } get pollEvents() { throw new Error("Not implemented"); } /** * Closes the file descriptor. * * @param {boolean} [force=false] * If true, the file descriptor is closed immediately. If false, the * file descriptor is closed after all current pending IO operations * have completed. * * @returns {Promise} * Resolves when the file descriptor has been closed. */ close(force = false) { if (!force && this.pending.length) { this.closing = true; return this.closedPromise; } for (let { reject } of this.pending) { let error = new Error("File closed"); error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; reject(error); } this.pending.length = 0; if (!this.closed) { this.fd.dispose(); this.closed = true; this.resolveClosed(); io.pipes.delete(this.id); io.updatePollFds(); } return this.closedPromise; } /** * Called when an error occurred while polling our file descriptor. */ onError() { this.close(true); this.process.wait(); } } class InputPipe extends Pipe { /** * A bit mask of poll() events which we currently wish to be notified of on * this file descriptor. */ get pollEvents() { if (this.pending.length) { return LIBC.POLLIN; } return 0; } /** * Asynchronously reads at most `length` bytes of binary data from the file * descriptor into an ArrayBuffer of the same size. Returns a promise which * resolves when the operation is complete. * * @param {integer} length * The number of bytes to read. * * @returns {Promise} */ read(length) { if (this.closing || this.closed) { throw new Error("Attempt to read from closed pipe"); } return new Promise((resolve, reject) => { this.pending.push({ resolve, reject, length }); io.updatePollFds(); }); } /** * Synchronously reads at most `count` bytes of binary data into an * ArrayBuffer, and returns that buffer. If no data can be read without * blocking, returns null instead. * * @param {integer} count * The number of bytes to read. * * @returns {ArrayBuffer|null} */ readBuffer(count) { let buffer = new ArrayBuffer(count); let read = +libc.read(this.fd, buffer, buffer.byteLength); if (read < 0 && ctypes.errno != LIBC.EAGAIN) { this.onError(); } if (read <= 0) { return null; } if (read < buffer.byteLength) { return ArrayBuffer_transfer(buffer, read); } return buffer; } /** * Called when one of the IO operations matching the `pollEvents` mask may be * performed without blocking. * * @returns {boolean} * True if any data was successfully read. */ onReady() { let result = false; let reads = this.pending; while (reads.length) { let { resolve, length } = reads[0]; let buffer = this.readBuffer(length); if (buffer) { result = true; this.shiftPending(); resolve(buffer); } else { break; } } if (!reads.length) { io.updatePollFds(); } return result; } } class OutputPipe extends Pipe { /** * A bit mask of poll() events which we currently wish to be notified of on * this file discriptor. */ get pollEvents() { if (this.pending.length) { return LIBC.POLLOUT; } return 0; } /** * Asynchronously writes the given buffer to our file descriptor, and returns * a promise which resolves when the operation is complete. * * @param {ArrayBuffer} buffer * The buffer to write. * * @returns {Promise} * Resolves to the number of bytes written when the operation is * complete. */ write(buffer) { if (this.closing || this.closed) { throw new Error("Attempt to write to closed pipe"); } return new Promise((resolve, reject) => { this.pending.push({ resolve, reject, buffer, length: buffer.byteLength }); io.updatePollFds(); }); } /** * Attempts to synchronously write the given buffer to our file descriptor. * Writes only as many bytes as can be written without blocking, and returns * the number of byes successfully written. * * Closes the file descriptor if an IO error occurs. * * @param {ArrayBuffer} buffer * The buffer to write. * * @returns {integer} * The number of bytes successfully written. */ writeBuffer(buffer) { let bytesWritten = libc.write(this.fd, buffer, buffer.byteLength); if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) { this.onError(); } return bytesWritten; } /** * Called when one of the IO operations matching the `pollEvents` mask may be * performed without blocking. */ onReady() { let writes = this.pending; while (writes.length) { let { buffer, resolve, length } = writes[0]; let written = this.writeBuffer(buffer); if (written == buffer.byteLength) { resolve(length); this.shiftPending(); } else if (written > 0) { writes[0].buffer = buffer.slice(written); } else { break; } } if (!writes.length) { io.updatePollFds(); } } } class Signal { constructor(fd) { this.fd = fd; } cleanup() { libc.close(this.fd); this.fd = null; } get pollEvents() { return LIBC.POLLIN; } /** * Called when an error occurred while polling our file descriptor. */ onError() { io.shutdown(); } /** * Called when one of the IO operations matching the `pollEvents` mask may be * performed without blocking. */ onReady() { let buffer = new ArrayBuffer(16); let count = +libc.read(this.fd, buffer, buffer.byteLength); if (count > 0) { io.messageCount += count; } } } class Process extends BaseProcess { /** * Each Process object opens an additional pipe from the target object, which * will be automatically closed when the process exits, but otherwise * carries no data. * * This property contains a bit mask of poll() events which we wish to be * notified of on this descriptor. We're not expecting any input from this * pipe, but we need to poll for input until the process exits in order to be * notified when the pipe closes. */ get pollEvents() { if (this.exitCode === null) { return LIBC.POLLIN; } return 0; } /** * Kills the process with the given signal. * * @param {integer} signal */ kill(signal) { libc.kill(this.pid, signal); this.wait(); } /** * Initializes the IO pipes for use as standard input, output, and error * descriptors in the spawned process. * * @param {object} options * The Subprocess options object for this process. * @returns {unix.Fd[]} * The array of file descriptors belonging to the spawned process. */ initPipes(options) { let stderr = options.stderr; let our_pipes = []; let their_pipes = new Map(); let pipe = input => { let fds = ctypes.int.array(2)(); let res = libc.pipe(fds); if (res == -1) { throw new Error("Unable to create pipe"); } fds = Array.from(fds, unix.Fd); if (input) { fds.reverse(); } if (input) { our_pipes.push(new InputPipe(this, fds[1])); } else { our_pipes.push(new OutputPipe(this, fds[1])); } libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC); libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC); libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK); return fds[0]; }; their_pipes.set(0, pipe(false)); their_pipes.set(1, pipe(true)); if (stderr == "pipe") { their_pipes.set(2, pipe(true)); } else if (stderr == "stdout") { their_pipes.set(2, their_pipes.get(1)); } // Create an additional pipe that we can use to monitor for process exit. their_pipes.set(3, pipe(true)); this.fd = our_pipes.pop().fd; this.pipes = our_pipes; return their_pipes; } spawn(options) { let fds = this.initPipes(options); let launchOptions = { environment: options.environment, disclaim: options.disclaim, fdMap: [], }; // Check for truthiness to avoid chdir("null") if (options.workdir) { launchOptions.workdir = options.workdir; } for (let [dst, src] of fds.entries()) { launchOptions.fdMap.push({ src, dst }); } try { this.pid = IOUtils.launchProcess(options.arguments, launchOptions); } finally { for (let fd of new Set(fds.values())) { fd.dispose(); } } } /** * Called when input is available on our sentinel file descriptor. * * @see pollEvents */ onReady() { // We're not actually expecting any input on this pipe. If we get any, we // can't poll the pipe any further without reading it. if (this.wait() == undefined) { this.kill(9); } } /** * Called when an error occurred while polling our sentinel file descriptor. * * @see pollEvents */ onError() { this.wait(); } /** * Attempts to wait for the process's exit status, without blocking. If * successful, resolves the `exitPromise` to the process's exit value. * * @returns {integer|null} * The process's exit status, if it has already exited. */ wait() { if (this.exitCode !== null) { return this.exitCode; } let status = ctypes.int(); let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG); // If there's a failure here and we get any errno other than EINTR, it // means that the process has been reaped by another thread (most likely // the nspr process wait thread), and its actual exit status is not // available to us. In that case, we have to assume success. if (res == 0 || (res == -1 && ctypes.errno == LIBC.EINTR)) { return null; } let sig = unix.WTERMSIG(status.value); if (sig) { this.exitCode = -sig; } else { this.exitCode = unix.WEXITSTATUS(status.value); } this.fd.dispose(); io.updatePollFds(); this.resolveExit(this.exitCode); return this.exitCode; } } io = { pollFds: null, pollHandlers: null, pipes: new Map(), processes: new Map(), messageCount: 0, running: true, polling: false, init(details) { this.signal = new Signal(details.signalFd); this.updatePollFds(); setTimeout(this.loop.bind(this), 0); }, shutdown() { if (this.running) { this.running = false; this.signal.cleanup(); this.signal = null; self.postMessage({ msg: "close" }); self.close(); } }, getPipe(pipeId) { let pipe = this.pipes.get(pipeId); if (!pipe) { let error = new Error("File closed"); error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; throw error; } return pipe; }, getProcess(processId) { let process = this.processes.get(processId); if (!process) { throw new Error(`Invalid process ID: ${processId}`); } return process; }, updatePollFds() { let handlers = [ this.signal, ...this.pipes.values(), ...this.processes.values(), ]; handlers = handlers.filter(handler => handler.pollEvents); // Our poll loop is only useful if we've got at least 1 thing to poll other than our own // signal. if (handlers.length == 1) { this.polling = false; } else if (!this.polling && this.running) { // Restart the poll loop if necessary: setTimeout(this.loop.bind(this), 0); this.polling = true; } let pollfds = unix.pollfd.array(handlers.length)(); for (let [i, handler] of handlers.entries()) { let pollfd = pollfds[i]; pollfd.fd = handler.fd; pollfd.events = handler.pollEvents; pollfd.revents = 0; } this.pollFds = pollfds; this.pollHandlers = handlers; }, loop() { this.poll(); if (this.running && this.polling) { setTimeout(this.loop.bind(this), 0); } }, poll() { let handlers = this.pollHandlers; let pollfds = this.pollFds; let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT; let count = libc.poll(pollfds, pollfds.length, timeout); for (let i = 0; count && i < pollfds.length; i++) { let pollfd = pollfds[i]; if (pollfd.revents) { count--; let handler = handlers[i]; try { let success = false; if (pollfd.revents & handler.pollEvents) { success = handler.onReady(); } // Only call the error handler in this iteration if we didn't also // have a success. This is necessary because Linux systems set POLLHUP // on a pipe when it's closed but there's still buffered data to be // read, and Darwin sets POLLIN and POLLHUP on a closed pipe, even // when there's no data to be read. if ( !success && pollfd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL) ) { handler.onError(); } } catch (e) { console.error(e); debug(`Worker error: ${e} :: ${e.stack}`); handler.onError(); } pollfd.revents = 0; } } }, addProcess(process) { this.processes.set(process.id, process); for (let pipe of process.pipes) { this.pipes.set(pipe.id, pipe); } }, cleanupProcess(process) { this.processes.delete(process.id); }, };