diff options
Diffstat (limited to 'testing/marionette/client/marionette_driver')
3 files changed, 673 insertions, 30 deletions
diff --git a/testing/marionette/client/marionette_driver/geckoinstance.py b/testing/marionette/client/marionette_driver/geckoinstance.py index e613415886..26ad58f360 100644 --- a/testing/marionette/client/marionette_driver/geckoinstance.py +++ b/testing/marionette/client/marionette_driver/geckoinstance.py @@ -29,6 +29,10 @@ from six import reraise from . import errors +if sys.platform.startswith("darwin"): + # Marionette's own processhandler is only used on MacOS for now + from .processhandler import UNKNOWN_RETURNCODE, ProcessHandler + class GeckoInstance(object): required_prefs = { @@ -388,7 +392,7 @@ class GeckoInstance(object): } ) - return { + args = { "binary": self.binary, "profile": self.profile, "cmdargs": ["-no-remote", "-marionette"] + self.app_args, @@ -397,6 +401,13 @@ class GeckoInstance(object): "process_args": process_args, } + if sys.platform.startswith("darwin"): + # Bug 1887666: The custom process handler class for Marionette is + # only supported on MacOS at the moment. + args["process_class"] = ProcessHandler + + return args + def close(self, clean=False): """ Close the managed Gecko process. @@ -431,6 +442,19 @@ class GeckoInstance(object): self.close(clean=clean) self.start() + def update_process(self, pid, timeout=None): + """Update the process to track when the application re-launched itself""" + if sys.platform.startswith("darwin"): + # The new process handler is only supported on MacOS yet + returncode = self.runner.process_handler.update_process(pid, timeout) + if returncode not in [0, UNKNOWN_RETURNCODE]: + raise IOError( + f"Old process inappropriately quit with exit code: {returncode}" + ) + + else: + returncode = self.runner.process_handler.check_for_detached(pid) + class FennecInstance(GeckoInstance): fennec_prefs = { @@ -454,7 +478,7 @@ class FennecInstance(GeckoInstance): package_name=None, env=None, *args, - **kwargs + **kwargs, ): required_prefs = deepcopy(FennecInstance.fennec_prefs) required_prefs.update(kwargs.get("prefs", {})) diff --git a/testing/marionette/client/marionette_driver/marionette.py b/testing/marionette/client/marionette_driver/marionette.py index a4b8c73792..7eefde4fc6 100644 --- a/testing/marionette/client/marionette_driver/marionette.py +++ b/testing/marionette/client/marionette_driver/marionette.py @@ -568,7 +568,7 @@ class Marionette(object): baseurl=None, socket_timeout=None, startup_timeout=None, - **instance_args + **instance_args, ): """Construct a holder for the Marionette connection. @@ -1273,6 +1273,8 @@ class Marionette(object): exc_cls, _, tb = sys.exc_info() if self.instance.runner.returncode is None: + self.is_shutting_down = False + # The process is still running, which means the shutdown # request was not correct or the application ignored it. # Allow Marionette to accept connections again. @@ -1291,34 +1293,29 @@ class Marionette(object): tb, ) - finally: - self.is_shutting_down = False + self.is_shutting_down = False + # Create a new session to retrieve the new process id of the application self.delete_session(send_request=False) else: self.delete_session() self.instance.restart(clean=clean) + self.raise_for_port(timeout=self.DEFAULT_STARTUP_TIMEOUT) restart_details.update({"in_app": False, "forced": True}) + self.start_session(self.requested_capabilities, process_forked=in_app) + # Restore the context as used before the restart + self.set_context(context) + if restart_details.get("cause") not in (None, "restart"): raise errors.MarionetteException( "Unexpected shutdown reason '{}' for " "restarting the process".format(restart_details["cause"]) ) - self.start_session(self.requested_capabilities) - # Restore the context as used before the restart - self.set_context(context) - - if in_app and self.process_id: - # In some cases Firefox restarts itself by spawning into a new process group. - # As long as mozprocess cannot track that behavior (bug 1284864) we assist by - # informing about the new process id. - self.instance.runner.process_handler.check_for_detached(self.process_id) - return restart_details def absolute_url(self, relative_url): @@ -1330,7 +1327,7 @@ class Marionette(object): return "{0}{1}".format(self.baseurl, relative_url) @do_process_check - def start_session(self, capabilities=None, timeout=None): + def start_session(self, capabilities=None, process_forked=False, timeout=None): """Create a new WebDriver session. This method must be called before performing any other action. @@ -1340,7 +1337,10 @@ class Marionette(object): (including alwaysMatch, firstMatch, desiredCapabilities, or requriedCapabilities), and only recognises extension capabilities that are specific to Marionette. + :param process_forked: If True, the existing process forked itself due + to an internal restart. :param timeout: Optional timeout in seconds for the server to be ready. + :returns: A dictionary of the capabilities offered. """ if capabilities is None: @@ -1352,17 +1352,20 @@ class Marionette(object): self.crashed = 0 - if self.instance: - returncode = self.instance.runner.returncode - # We're managing a binary which has terminated. Start it again - # and implicitely wait for the Marionette server to be ready. - if returncode is not None: - self.start_binary(timeout) + if not process_forked: + # Only handle the binary if there was no process before which also + # might have forked itself due to a restart + if self.instance: + returncode = self.instance.runner.returncode + # We're managing a binary which has terminated. Start it again + # and implicitely wait for the Marionette server to be ready. + if returncode is not None: + self.start_binary(timeout) - else: - # In the case when Marionette doesn't manage the binary wait until - # its server component has been started. - self.raise_for_port(timeout=timeout) + else: + # In the case when Marionette doesn't manage the binary wait until + # its server component has been started. + self.raise_for_port(timeout=timeout) self.client = transport.TcpTransport(self.host, self.port, self.socket_timeout) self.protocol, _ = self.client.connect() @@ -1380,10 +1383,11 @@ class Marionette(object): self.session_id = resp["sessionId"] self.session = resp["capabilities"] self.cleanup_ran = False - # fallback to processId can be removed in Firefox 55 - self.process_id = self.session.get( - "moz:processID", self.session.get("processId") - ) + + self.process_id = self.session.get("moz:processID") + if process_forked: + self.instance.update_process(self.process_id, self.shutdown_timeout) + self.profile = self.session.get("moz:profile") timeout = self.session.get("moz:shutdownTimeout") diff --git a/testing/marionette/client/marionette_driver/processhandler.py b/testing/marionette/client/marionette_driver/processhandler.py new file mode 100644 index 0000000000..deac1b02ea --- /dev/null +++ b/testing/marionette/client/marionette_driver/processhandler.py @@ -0,0 +1,615 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +# The Marionette ProcessHandler and ProcessHandlerMixin classes are only +# utilized by Marionette as an alternative to the mozprocess package. +# +# This necessity arises because Marionette supports the application to +# restart itself and, under such conditions, fork its process. To maintain +# the ability to track the process, including permissions to terminate +# the process and receive log entries via stdout and stderr, the psutil +# package is utilized. To prevent any side effects for consumers of +# mozprocess, all necessary helper classes have been duplicated for now. + +import codecs +import os +import signal +import subprocess +import sys +import threading +import time +import traceback +from queue import Empty, Queue + +import psutil +import six + +# Set the MOZPROCESS_DEBUG environment variable to 1 to see some debugging output +MOZPROCESS_DEBUG = os.getenv("MOZPROCESS_DEBUG") + +INTERVAL_PROCESS_ALIVE_CHECK = 0.02 + +# For not self-managed processes the returncode seems to not be available. +# Use `8` to indicate this specific situation for now. +UNKNOWN_RETURNCODE = 8 + +# We dont use mozinfo because it is expensive to import, see bug 933558. +isPosix = os.name == "posix" # includes MacOS X + + +class ProcessHandlerMixin(object): + """ + A class for launching and manipulating local processes. + + :param cmd: command to run. May be a string or a list. If specified as a list, the first + element will be interpreted as the command, and all additional elements will be interpreted + as arguments to that command. + :param args: list of arguments to pass to the command (defaults to None). Must not be set when + `cmd` is specified as a list. + :param cwd: working directory for command (defaults to None). + :param env: is the environment to use for the process (defaults to os.environ). + :param kill_on_timeout: when True, the process will be killed when a timeout is reached. + When False, the caller is responsible for killing the process. + Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.) + :param processOutputLine: function or list of functions to be called for + each line of output produced by the process (defaults to an empty + list). + :param processStderrLine: function or list of functions to be called + for each line of error output - stderr - produced by the process + (defaults to an empty list). If this is not specified, stderr lines + will be sent to the *processOutputLine* callbacks. + :param onTimeout: function or list of functions to be called when the process times out. + :param onFinish: function or list of functions to be called when the process terminates + normally without timing out. + :param kwargs: additional keyword args to pass directly into Popen. + + NOTE: Child processes will be tracked by default. + """ + + def __init__( + self, + cmd, + args=None, + cwd=None, + env=None, + kill_on_timeout=True, + processOutputLine=(), + processStderrLine=(), + onTimeout=(), + onFinish=(), + **kwargs, + ): + self.args = args + self.cmd = cmd + self.cwd = cwd + self.keywordargs = kwargs + + self.didTimeout = False + self.didOutputTimeout = False + self.proc = None + + if env is None: + env = os.environ.copy() + self.env = env + + # handlers + def to_callable_list(arg): + if callable(arg): + arg = [arg] + return CallableList(arg) + + processOutputLine = to_callable_list(processOutputLine) + processStderrLine = to_callable_list(processStderrLine) + onTimeout = to_callable_list(onTimeout) + onFinish = to_callable_list(onFinish) + + def on_timeout(): + self.didTimeout = True + self.didOutputTimeout = self.reader.didOutputTimeout + if kill_on_timeout: + self.kill() + + onTimeout.insert(0, on_timeout) + + self._stderr = subprocess.STDOUT + if processStderrLine: + self._stderr = subprocess.PIPE + self.reader = ProcessReader( + stdout_callback=processOutputLine, + stderr_callback=processStderrLine, + finished_callback=onFinish, + timeout_callback=onTimeout, + ) + + # It is common for people to pass in the entire array with the cmd and + # the args together since this is how Popen uses it. Allow for that. + if isinstance(self.cmd, list): + if self.args is not None: + raise TypeError("cmd and args must not both be lists") + (self.cmd, self.args) = (self.cmd[0], self.cmd[1:]) + elif self.args is None: + self.args = [] + + def _has_valid_proc(func): + def wrapper(self, *args, **kwargs): + if self.proc is None: + raise RuntimeError("Process hasn't been started yet") + return func(self, *args, **kwargs) + + return wrapper + + @property + @_has_valid_proc + def pid(self): + return self.proc.pid + + @staticmethod + def pid_exists(pid): + return psutil.pid_exists(pid) + + @property + def timedOut(self): + """True if the process has timed out for any reason.""" + return self.didTimeout + + @property + def outputTimedOut(self): + """True if the process has timed out for no output.""" + return self.didOutputTimeout + + @property + def commandline(self): + """the string value of the command line (command + args)""" + return subprocess.list2cmdline([self.cmd] + self.args) + + def _debug(self, msg): + if not MOZPROCESS_DEBUG: + return + + print(f"DBG::MARIONETTE ProcessHandler {self.pid} | {msg}", file=sys.stdout) + + @_has_valid_proc + def kill(self, sig=None, timeout=None): + """Kills the managed process and all its child processes. + + :param sig: Signal to use to kill the process. (Defaults to SIGKILL) + + :param timeout: If not None, wait this number of seconds for the + process to exit. + + Note that this does not manage any state, save any output etc, + it immediately kills the process. + """ + if hasattr(self, "returncode"): + return self.returncode + + if self.proc.is_running(): + processes = [self.proc] + self.proc.children(recursive=True) + + if sig is None: + # TODO: try SIGTERM first to sanely shutdown the application + # and to not break later when Windows support gets added. + sig = signal.SIGKILL + + # Do we need that? + for process in processes: + try: + self._debug(f"Killing process: {process}") + process.send_signal(sig) + except psutil.NoSuchProcess: + pass + psutil.wait_procs(processes, timeout=timeout) + + # When we kill the the managed process we also have to wait for the + # reader thread to be finished. Otherwise consumers would have to assume + # that it still has not completely shutdown. + self.returncode = self.wait(0) + if self.returncode is None: + self._debug("kill: wait failed -- process is still alive") + + return self.returncode + + @_has_valid_proc + def poll(self): + """Check if child process has terminated + + Returns the current returncode value: + - None if the process hasn't terminated yet + - A negative number if the process was killed by signal N (Unix only) + - '0' if the process ended without failures + + """ + if hasattr(self, "returncode"): + return self.returncode + + # If the process that is observed wasn't started with Popen there is + # no `poll()` method available. Use `wait()` instead and do not wait + # for the reader thread because it would cause extra delays. + return self.wait(0, wait_reader=False) + + def processOutput(self, timeout=None, outputTimeout=None): + """ + Handle process output until the process terminates or times out. + + :param timeout: If not None, the process will be allowed to continue + for that number of seconds before being killed. + + :outputTimeout: If not None, the process will be allowed to continue + for that number of seconds without producing any output before + being killed. + """ + # this method is kept for backward compatibility + if not hasattr(self, "proc"): + self.run(timeout=timeout, outputTimeout=outputTimeout) + # self.run will call this again + return + if not self.reader.is_alive(): + self.reader.timeout = timeout + self.reader.output_timeout = outputTimeout + self.reader.start(self.proc) + + def run(self, timeout=None, outputTimeout=None): + """ + Starts the process. + + :param timeout: If not None, the process will be allowed to continue for + that number of seconds before being killed. If the process is killed + due to a timeout, the onTimeout handler will be called. + + :outputTimeout: If not None, the process will be allowed to continue + for that number of seconds without producing any output before + being killed. + """ + self.didTimeout = False + self.didOutputTimeout = False + + # default arguments + args = dict( + stdout=subprocess.PIPE, + stderr=self._stderr, + cwd=self.cwd, + env=self.env, + ) + + # build process arguments + args.update(self.keywordargs) + + # launch the process + self.proc = psutil.Popen([self.cmd] + self.args, **args) + + self.processOutput(timeout=timeout, outputTimeout=outputTimeout) + + @_has_valid_proc + def update_process(self, new_pid, timeout=None): + """Update the internally managed process for the provided process ID. + + When the application restarts itself, such as during an update, the new + process is essentially a fork of itself. To continue monitoring this + process, the process ID needs to be updated accordingly. + + :param new_pid: The ID of the new (forked) process to track. + + :timeout: If not None, the old process will be allowed to continue for + that number of seconds before being killed. + """ + if isPosix: + if new_pid == self.pid: + return + + print( + 'Child process with id "%s" has been marked as detached because it is no ' + "longer in the managed process group. Keeping reference to the process id " + '"%s" which is the new child process.' % (self.pid, new_pid), + file=sys.stdout, + ) + + returncode = self.wait(timeout, wait_reader=False) + if returncode is None: + # If the process is still running force kill it. + returncode = self.kill() + + if hasattr(self, "returncode"): + del self.returncode + + self.proc = psutil.Process(new_pid) + self._debug( + f"New process status: {self.proc} (terminal={self.proc.terminal()})" + ) + + return returncode + + @_has_valid_proc + def wait(self, timeout=None, wait_reader=True): + """ + Waits until the process is terminated. + + :param timeout: If not None, will return after timeout seconds. + This timeout only causes the wait function to return and + does not kill the process. + + :param wait_reader: If set to True, it waits not only for the process + to exit but also for all output to be fully read. (Defaults to True). + + Returns the process exit code value: + - None if the process hasn't terminated yet + - A negative number if the process was killed by signal N (Unix only) + - '0' if the process ended without failures + + """ + # Thread.join() blocks the main thread until the reader thread is finished + # wake up once a second in case a keyboard interrupt is sent + if ( + wait_reader + and self.reader.thread + and self.reader.thread is not threading.current_thread() + ): + count = 0 + while self.reader.is_alive(): + if timeout is not None and count > timeout: + self._debug("wait timeout for reader thread") + return None + self.reader.join(timeout=1) + count += 1 + + try: + self.proc.wait(timeout) + self._debug(f"Process status after wait: {self.proc}") + + if not isinstance(self.proc, psutil.Popen): + self._debug( + "Not self-managed processes do not have a returncode. " + f"Setting its value to {UNKNOWN_RETURNCODE}." + ) + self.returncode = UNKNOWN_RETURNCODE + + else: + self.returncode = self.proc.returncode + + return self.returncode + except psutil.TimeoutExpired: + return None + + +class CallableList(list): + def __call__(self, *args, **kwargs): + for e in self: + e(*args, **kwargs) + + def __add__(self, lst): + return CallableList(list.__add__(self, lst)) + + +class ProcessReader(object): + def __init__( + self, + stdout_callback=None, + stderr_callback=None, + finished_callback=None, + timeout_callback=None, + timeout=None, + output_timeout=None, + ): + self.stdout_callback = stdout_callback or (lambda line: True) + self.stderr_callback = stderr_callback or (lambda line: True) + self.finished_callback = finished_callback or (lambda: True) + self.timeout_callback = timeout_callback or (lambda: True) + self.timeout = timeout + self.output_timeout = output_timeout + self.thread = None + self.didOutputTimeout = False + + def debug(self, msg): + if not MOZPROCESS_DEBUG: + return + + print("DBG::MARIONETTE ProcessReader | {}".format(msg), file=sys.stdout) + + def _create_stream_reader(self, name, stream, queue, callback): + thread = threading.Thread( + name=name, target=self._read_stream, args=(stream, queue, callback) + ) + thread.daemon = True + thread.start() + return thread + + def _read_stream(self, stream, queue, callback): + while True: + line = stream.readline() + if not line: + break + queue.put((line, callback)) + stream.close() + + def start(self, proc): + queue = Queue() + stdout_reader = None + if proc.stdout: + stdout_reader = self._create_stream_reader( + "ProcessReaderStdout", proc.stdout, queue, self.stdout_callback + ) + stderr_reader = None + if proc.stderr and proc.stderr != proc.stdout: + stderr_reader = self._create_stream_reader( + "ProcessReaderStderr", proc.stderr, queue, self.stderr_callback + ) + self.thread = threading.Thread( + name="ProcessReader", + target=self._read, + args=(stdout_reader, stderr_reader, queue), + ) + self.thread.daemon = True + self.thread.start() + self.debug("ProcessReader started") + + def _read(self, stdout_reader, stderr_reader, queue): + start_time = time.time() + timed_out = False + timeout = self.timeout + if timeout is not None: + timeout += start_time + output_timeout = self.output_timeout + if output_timeout is not None: + output_timeout += start_time + + while (stdout_reader and stdout_reader.is_alive()) or ( + stderr_reader and stderr_reader.is_alive() + ): + has_line = True + try: + line, callback = queue.get(True, INTERVAL_PROCESS_ALIVE_CHECK) + except Empty: + has_line = False + now = time.time() + if not has_line: + if output_timeout is not None and now > output_timeout: + timed_out = True + self.didOutputTimeout = True + break + else: + if output_timeout is not None: + output_timeout = now + self.output_timeout + callback(line.rstrip()) + if timeout is not None and now > timeout: + timed_out = True + break + self.debug("_read loop exited") + # process remaining lines to read + while not queue.empty(): + line, callback = queue.get(False) + try: + callback(line.rstrip()) + except Exception: + traceback.print_exc() + if timed_out: + try: + self.timeout_callback() + except Exception: + traceback.print_exc() + if stdout_reader: + stdout_reader.join() + if stderr_reader: + stderr_reader.join() + if not timed_out: + try: + self.finished_callback() + except Exception: + traceback.print_exc() + self.debug("_read exited") + + def is_alive(self): + if self.thread: + return self.thread.is_alive() + return False + + def join(self, timeout=None): + if self.thread: + self.thread.join(timeout=timeout) + + +# default output handlers +# these should be callables that take the output line + + +class StoreOutput(object): + """accumulate stdout""" + + def __init__(self): + self.output = [] + + def __call__(self, line): + self.output.append(line) + + +class StreamOutput(object): + """pass output to a stream and flush""" + + def __init__(self, stream, text=True): + self.stream = stream + self.text = text + + def __call__(self, line): + ensure = six.ensure_text if self.text else six.ensure_binary + try: + self.stream.write(ensure(line, errors="ignore") + ensure("\n")) + except TypeError: + print( + "HEY! If you're reading this, you're about to encounter a " + "type error, probably as a result of a conversion from " + "Python 2 to Python 3. This is almost definitely because " + "you're trying to write binary data to a text-encoded " + "stream, or text data to a binary-encoded stream. Check how " + "you're instantiating your ProcessHandler and if the output " + "should be text-encoded, make sure you pass " + "universal_newlines=True.", + file=sys.stderr, + ) + raise + self.stream.flush() + + +class LogOutput(StreamOutput): + """pass output to a file""" + + def __init__(self, filename): + self.file_obj = open(filename, "a") + StreamOutput.__init__(self, self.file_obj, True) + + def __del__(self): + if self.file_obj is not None: + self.file_obj.close() + + +# front end class with the default handlers + + +class ProcessHandler(ProcessHandlerMixin): + """ + Convenience class for handling processes with default output handlers. + + By default, all output is sent to stdout. This can be disabled by setting + the *stream* argument to None. + + If processOutputLine keyword argument is specified the function or the + list of functions specified by this argument will be called for each line + of output; the output will not be written to stdout automatically then + if stream is True (the default). + + If storeOutput==True, the output produced by the process will be saved + as self.output. + + If logfile is not None, the output produced by the process will be + appended to the given file. + """ + + def __init__(self, cmd, logfile=None, stream=True, storeOutput=True, **kwargs): + kwargs.setdefault("processOutputLine", []) + if callable(kwargs["processOutputLine"]): + kwargs["processOutputLine"] = [kwargs["processOutputLine"]] + + if logfile: + logoutput = LogOutput(logfile) + kwargs["processOutputLine"].append(logoutput) + + text = kwargs.get("universal_newlines", False) or kwargs.get("text", False) + + if stream is True: + if text: + # The encoding of stdout isn't guaranteed to be utf-8. Fix that. + stdout = codecs.getwriter("utf-8")(sys.stdout.buffer) + else: + stdout = sys.stdout.buffer + + if not kwargs["processOutputLine"]: + kwargs["processOutputLine"].append(StreamOutput(stdout, text)) + elif stream: + streamoutput = StreamOutput(stream, text) + kwargs["processOutputLine"].append(streamoutput) + + self.output = None + if storeOutput: + storeoutput = StoreOutput() + self.output = storeoutput.output + kwargs["processOutputLine"].append(storeoutput) + + ProcessHandlerMixin.__init__(self, cmd, **kwargs) |