diff options
Diffstat (limited to 'testing/mozbase/mozdevice')
-rw-r--r-- | testing/mozbase/mozdevice/mozdevice/__init__.py | 181 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/mozdevice/adb.py | 4440 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/mozdevice/adb_android.py | 13 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/mozdevice/remote_process_monitor.py | 285 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/mozdevice/version_codes.py | 70 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/setup.cfg | 2 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/setup.py | 34 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/tests/conftest.py | 236 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/tests/manifest.ini | 6 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/tests/test_chown.py | 67 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/tests/test_escape_command_line.py | 21 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/tests/test_is_app_installed.py | 38 | ||||
-rw-r--r-- | testing/mozbase/mozdevice/tests/test_socket_connection.py | 124 |
13 files changed, 5517 insertions, 0 deletions
diff --git a/testing/mozbase/mozdevice/mozdevice/__init__.py b/testing/mozbase/mozdevice/mozdevice/__init__.py new file mode 100644 index 0000000000..e8e4965b92 --- /dev/null +++ b/testing/mozbase/mozdevice/mozdevice/__init__.py @@ -0,0 +1,181 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +"""mozdevice provides a Python interface to the Android Debug Bridge (adb) for Android Devices. + +mozdevice exports the following classes: + +ADBProcess is a class which is used by ADBCommand to execute commands +via subprocess.Popen. + +ADBCommand is an internal only class which provides the basics of +the interfaces for connecting to a device, and executing commands +either on the host or device using ADBProcess. + +ADBHost is a Python class used to execute commands which are not +necessarily associated with a specific device. It is intended to be +used directly. + +ADBDevice is a Python class used to execute commands which will +interact with a specific connected Android device. + +ADBAndroid inherits directly from ADBDevice and is essentially a +synonym for ADBDevice. It is included for backwards compatibility only +and should not be used in new code. + +ADBDeviceFactory is a Python function used to create instances of +ADBDevice. ADBDeviceFactory is preferred over using ADBDevice to +create new instances of ADBDevice since it will only create one +instance of ADBDevice for each connected device. + +mozdevice exports the following exceptions: + +:: + + Exception - + |- ADBTimeoutError + |- ADBDeviceFactoryError + |- ADBError + |- ADBProcessError + |- ADBListDevicesError + +ADBTimeoutError is a special exception that is not part of the +ADBError class hierarchy. It is raised when a command has failed to +complete within the specified timeout period. Since this typically is +due to a failure in the usb connection to the device and is not +recoverable, it is implemented separately from ADBError so that it +will not be caught by normal except clause handling of expected error +conditions and is considered to be treated as a *fatal* error. + +ADBDeviceFactoryError is also a special exception that is not part +of the ADBError class hierarchy. It is raised by ADBDeviceFactory +when the state of the internal ADBDevices object is in an +inconsistent state and is considered to be a *fatal* error. + +ADBListDevicesError is an instance of ADBError which is +raised only by the ADBHost.devices() method to signify that +``adb devices`` reports that the device state has no permissions and can +not be contacted via adb. + +ADBProcessError is an instance of ADBError which is raised when a +process executed via ADBProcess has exited with a non-zero exit +code. It is raised by the ADBCommand.command method and the methods +that call it. + +ADBError is a generic exception class to signify that some error +condition has occured which may be handled depending on the semantics +of the executing code. + +Example: + +:: + + from mozdevice import ADBHost, ADBDeviceFactory, ADBError + + adbhost = ADBHost() + try: + adbhost.kill_server() + adbhost.start_server() + except ADBError as e: + print('Unable to restart the adb server: {}'.format(str(e))) + + device = ADBDeviceFactory() + try: + sdcard_contents = device.ls('/sdcard/') # List the contents of the sdcard on the device. + print('sdcard contains {}'.format(' '.join(sdcard_contents)) + except ADBError as e: + print('Unable to list the sdcard: {}'.format(str(e))) + +Android devices use a security model based upon user permissions much +like that used in Linux upon which it is based. The adb shell executes +commands on the device as the shell user whose access to the files and +directories on the device are limited by the directory and file +permissions set in the device's file system. + +Android apps run under their own user accounts and are restricted by +the app's requested permissions in terms of what commands and files +and directories they may access. + +Like Linux, Android supports a root user who has unrestricted access +to the command and content stored on the device. + +Most commercially released Android devices do not allow adb to run +commands as the root user. Typically, only Android emulators running +certain system images, devices which have AOSP debug or engineering +Android builds or devices which have been *rooted* can run commands as +the root user. + +ADBDevice supports using both unrooted and rooted devices by laddering +its capabilities depending on the specific circumstances where it is +used. + +ADBDevice uses a special location on the device, called the +*test_root*, where it places content to be tested. This can include +binary executables and libraries, configuration files and log +files. Since the special location /data/local/tmp is usually +accessible by the shell user, the test_root is located at +/data/local/tmp/test_root by default. /data/local/tmp is used instead +of the sdcard due to recent Scoped Storage restrictions on access to +the sdcard in Android 10 and later. + +If the device supports running adbd as root, or if the device has been +rooted and supports the use of the su command to run commands as root, +ADBDevice will default to running all shell commands under the root +user and the test_root will remain set to /data/local/tmp/test_root +unless changed. + +If the device does not support running shell commands under the root +user, and a *debuggable* app is set in ADBDevice property +run_as_package, then ADBDevice will set the test_root to +/data/data/<app-package-name>/test_root and will run shell commands as +the app user when accessing content located in the app's data +directory. Content can be pushed to the app's data directory or pulled +from the app's data directory by using the command run-as to access +the app's data. + +If a device does not support running commands as root and a +*debuggable* app is not being used, command line programs can still be +executed by pushing them to the /data/local/tmp directory which is +accessible to the shell user. + +If for some reason, the device is not rooted and /data/local/tmp is +not acccessible to the shell user, then ADBDevice will fail to +initialize and will not be useable for that device. + +NOTE: ADBFactory will clear the contents of the test_root when it +first creates an instance of ADBDevice. + +When the run_as_package property is set in an ADBDevice instance, it +will clear the contents of the current test_root before changing the +test_root to point to the new location +/data/data/<app-package-name>/test_root which will then be cleared of +any existing content. + +""" + +from .adb import ( + ADBCommand, + ADBDevice, + ADBDeviceFactory, + ADBError, + ADBHost, + ADBProcess, + ADBProcessError, + ADBTimeoutError, +) +from .adb_android import ADBAndroid +from .remote_process_monitor import RemoteProcessMonitor + +__all__ = [ + "ADBError", + "ADBProcessError", + "ADBTimeoutError", + "ADBProcess", + "ADBCommand", + "ADBHost", + "ADBDevice", + "ADBAndroid", + "ADBDeviceFactory", + "RemoteProcessMonitor", +] diff --git a/testing/mozbase/mozdevice/mozdevice/adb.py b/testing/mozbase/mozdevice/mozdevice/adb.py new file mode 100644 index 0000000000..cf1b78064e --- /dev/null +++ b/testing/mozbase/mozdevice/mozdevice/adb.py @@ -0,0 +1,4440 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +import io +import os +import pipes +import posixpath +import re +import shlex +import shutil +import signal +import subprocess +import sys +import tempfile +import time +import traceback +from distutils import dir_util +from threading import Thread + +import six +from six.moves import range + +from . import version_codes + +_TEST_ROOT = None + + +class ADBProcess(object): + """ADBProcess encapsulates the data related to executing the adb process.""" + + def __init__(self, args, use_stdout_pipe=False, timeout=None): + #: command argument list. + self.args = args + Popen_args = {} + + #: Temporary file handle to be used for stdout. + if use_stdout_pipe: + self.stdout_file = subprocess.PIPE + # Reading utf-8 from the stdout pipe + if sys.version_info >= (3, 6): + Popen_args["encoding"] = "utf-8" + else: + Popen_args["universal_newlines"] = True + else: + self.stdout_file = tempfile.NamedTemporaryFile(mode="w+b") + Popen_args["stdout"] = self.stdout_file + + #: boolean indicating if the command timed out. + self.timedout = None + + #: exitcode of the process. + self.exitcode = None + + #: subprocess Process object used to execute the command. + Popen_args["stderr"] = subprocess.STDOUT + self.proc = subprocess.Popen(args, **Popen_args) + + # If a timeout is set, then create a thread responsible for killing the + # process, as well as updating the exitcode and timedout status. + def timeout_thread(adb_process, timeout): + start_time = time.time() + polling_interval = 0.001 + adb_process.exitcode = adb_process.proc.poll() + while (time.time() - start_time) <= float( + timeout + ) and adb_process.exitcode is None: + time.sleep(polling_interval) + adb_process.exitcode = adb_process.proc.poll() + + if adb_process.exitcode is None: + adb_process.proc.kill() + adb_process.timedout = True + adb_process.exitcode = adb_process.proc.poll() + + if timeout: + Thread(target=timeout_thread, args=(self, timeout), daemon=True).start() + + @property + def stdout(self): + """Return the contents of stdout.""" + assert not self.stdout_file == subprocess.PIPE + if not self.stdout_file or self.stdout_file.closed: + content = "" + else: + self.stdout_file.seek(0, os.SEEK_SET) + content = six.ensure_str(self.stdout_file.read().rstrip()) + return content + + def __str__(self): + # Remove -s <serialno> from the error message to allow bug suggestions + # to be independent of the individual failing device. + arg_string = " ".join(self.args) + arg_string = re.sub(r" -s [\w-]+", "", arg_string) + return "args: %s, exitcode: %s, stdout: %s" % ( + arg_string, + self.exitcode, + self.stdout, + ) + + def __iter__(self): + assert self.stdout_file == subprocess.PIPE + return self + + def __next__(self): + assert self.stdout_file == subprocess.PIPE + try: + return next(self.proc.stdout) + except StopIteration: + # Wait until the process ends. + while self.exitcode is None or self.timedout: + time.sleep(0.001) + raise StopIteration + + +# ADBError and ADBTimeoutError are treated differently in order that +# ADBTimeoutErrors can be handled distinctly from ADBErrors. + + +class ADBError(Exception): + """ADBError is raised in situations where a command executed on a + device either exited with a non-zero exitcode or when an + unexpected error condition has occurred. Generally, ADBErrors can + be handled and the device can continue to be used. + """ + + pass + + +class ADBProcessError(ADBError): + """ADBProcessError is raised when an associated ADBProcess is + available and relevant. + """ + + def __init__(self, adb_process): + ADBError.__init__(self, str(adb_process)) + self.adb_process = adb_process + + +class ADBListDevicesError(ADBError): + """ADBListDevicesError is raised when errors are found listing the + devices, typically not any permissions. + + The devices information is stocked with the *devices* member. + """ + + def __init__(self, msg, devices): + ADBError.__init__(self, msg) + self.devices = devices + + +class ADBTimeoutError(Exception): + """ADBTimeoutError is raised when either a host command or shell + command takes longer than the specified timeout to execute. The + timeout value is set in the ADBCommand constructor and is 300 seconds by + default. This error is typically fatal since the host is having + problems communicating with the device. You may be able to recover + by rebooting, but this is not guaranteed. + + Recovery options are: + + * Killing and restarting the adb server via + :: + + adb kill-server; adb start-server + + * Rebooting the device manually. + * Rebooting the host. + """ + + pass + + +class ADBDeviceFactoryError(Exception): + """ADBDeviceFactoryError is raised when the ADBDeviceFactory is in + an inconsistent state. + """ + + pass + + +class ADBCommand(object): + """ADBCommand provides a basic interface to adb commands + which is used to provide the 'command' methods for the + classes ADBHost and ADBDevice. + + ADBCommand should only be used as the base class for other + classes and should not be instantiated directly. To enforce this + restriction calling ADBCommand's constructor will raise a + NonImplementedError exception. + + :param str adb: path to adb executable. Defaults to 'adb'. + :param str adb_host: host of the adb server. + :param int adb_port: port of the adb server. + :param str logger_name: logging logger name. Defaults to 'adb'. + :param int timeout: The default maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value defaults to 300. + :param bool verbose: provide verbose output + :param bool use_root: Use root if available on device + :raises: :exc:`ADBError` + :exc:`ADBTimeoutError` + + :: + + from mozdevice import ADBCommand + + try: + adbcommand = ADBCommand() + except NotImplementedError: + print "ADBCommand can not be instantiated." + """ + + def __init__( + self, + adb="adb", + adb_host=None, + adb_port=None, + logger_name="adb", + timeout=300, + verbose=False, + use_root=True, + ): + if self.__class__ == ADBCommand: + raise NotImplementedError + + self._logger = self._get_logger(logger_name, verbose) + self._verbose = verbose + self._use_root = use_root + self._adb_path = adb + self._adb_host = adb_host + self._adb_port = adb_port + self._timeout = timeout + self._polling_interval = 0.001 + self._adb_version = "" + + self._logger.debug("%s: %s" % (self.__class__.__name__, self.__dict__)) + + # catch early a missing or non executable adb command + # and get the adb version while we are at it. + try: + output = subprocess.Popen( + [adb, "version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ).communicate() + re_version = re.compile(r"Android Debug Bridge version (.*)") + if isinstance(output[0], six.binary_type): + self._adb_version = re_version.match( + output[0].decode("utf-8", "replace") + ).group(1) + else: + self._adb_version = re_version.match(output[0]).group(1) + + if self._adb_version < "1.0.36": + raise ADBError( + "adb version %s less than minimum 1.0.36" % self._adb_version + ) + + except Exception as exc: + raise ADBError("%s: %s is not executable." % (exc, adb)) + + def _get_logger(self, logger_name, verbose): + logger = None + level = "DEBUG" if verbose else "INFO" + try: + import mozlog + + logger = mozlog.get_default_logger(logger_name) + if not logger: + if sys.__stdout__.isatty(): + defaults = {"mach": sys.stdout} + else: + defaults = {"tbpl": sys.stdout} + logger = mozlog.commandline.setup_logging( + logger_name, {}, defaults, formatter_defaults={"level": level} + ) + except ImportError: + pass + + if logger is None: + import logging + + logger = logging.getLogger(logger_name) + logger.setLevel(level) + return logger + + # Host Command methods + + def command(self, cmds, device_serial=None, timeout=None): + """Executes an adb command on the host. + + :param list cmds: The command and its arguments to be + executed. + :param str device_serial: The device's + serial number if the adb command is to be executed against + a specific device. If it is not specified, ANDROID_SERIAL + from the environment will be used if it is set. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBCommand constructor is used. + :return: :class:`ADBProcess` + + command() provides a low level interface for executing + commands on the host via adb. + + command() executes on the host in such a fashion that stdout + of the adb process is a file handle on the host and + the exit code is available as the exit code of the adb + process. + + The caller provides a list containing commands, as well as a + timeout period in seconds. + + A subprocess is spawned to execute adb with stdout and stderr + directed to a temporary file. If the process takes longer than + the specified timeout, the process is terminated. + + It is the caller's responsibilty to clean up by closing + the stdout temporary file. + """ + args = [self._adb_path] + device_serial = device_serial or os.environ.get("ANDROID_SERIAL") + if self._adb_host: + args.extend(["-H", self._adb_host]) + if self._adb_port: + args.extend(["-P", str(self._adb_port)]) + if device_serial: + args.extend(["-s", device_serial, "wait-for-device"]) + args.extend(cmds) + + adb_process = ADBProcess(args) + + if timeout is None: + timeout = self._timeout + + start_time = time.time() + adb_process.exitcode = adb_process.proc.poll() + while (time.time() - start_time) <= float( + timeout + ) and adb_process.exitcode is None: + time.sleep(self._polling_interval) + adb_process.exitcode = adb_process.proc.poll() + if adb_process.exitcode is None: + adb_process.proc.kill() + adb_process.timedout = True + adb_process.exitcode = adb_process.proc.poll() + + adb_process.stdout_file.seek(0, os.SEEK_SET) + + return adb_process + + def command_output(self, cmds, device_serial=None, timeout=None): + """Executes an adb command on the host returning stdout. + + :param list cmds: The command and its arguments to be + executed. + :param str device_serial: The device's + serial number if the adb command is to be executed against + a specific device. If it is not specified, ANDROID_SERIAL + from the environment will be used if it is set. + :param int timeout: The maximum time in seconds + for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBCommand constructor is used. + :return: str - content of stdout. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + adb_process = None + try: + # Need to force the use of the ADBCommand class's command + # since ADBDevice will redefine command and call its + # own version otherwise. + adb_process = ADBCommand.command( + self, cmds, device_serial=device_serial, timeout=timeout + ) + if adb_process.timedout: + raise ADBTimeoutError("%s" % adb_process) + if adb_process.exitcode: + raise ADBProcessError(adb_process) + output = adb_process.stdout + if self._verbose: + self._logger.debug( + "command_output: %s, " + "timeout: %s, " + "timedout: %s, " + "exitcode: %s, output: %s" + % ( + " ".join(adb_process.args), + timeout, + adb_process.timedout, + adb_process.exitcode, + output, + ) + ) + + return output + finally: + if adb_process and isinstance(adb_process.stdout_file, io.IOBase): + adb_process.stdout_file.close() + + +class ADBHost(ADBCommand): + """ADBHost provides a basic interface to adb host commands + which do not target a specific device. + + :param str adb: path to adb executable. Defaults to 'adb'. + :param str adb_host: host of the adb server. + :param int adb_port: port of the adb server. + :param logger_name: logging logger name. Defaults to 'adb'. + :param int timeout: The default maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value defaults to 300. + :param bool verbose: provide verbose output + :raises: :exc:`ADBError` + :exc:`ADBTimeoutError` + + :: + + from mozdevice import ADBHost + + adbhost = ADBHost() + adbhost.start_server() + """ + + def __init__( + self, + adb="adb", + adb_host=None, + adb_port=None, + logger_name="adb", + timeout=300, + verbose=False, + ): + ADBCommand.__init__( + self, + adb=adb, + adb_host=adb_host, + adb_port=adb_port, + logger_name=logger_name, + timeout=timeout, + verbose=verbose, + use_root=True, + ) + + def command(self, cmds, timeout=None): + """Executes an adb command on the host. + + :param list cmds: The command and its arguments to be + executed. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBHost constructor is used. + :return: :class:`ADBProcess` + + command() provides a low level interface for executing + commands on the host via adb. + + command() executes on the host in such a fashion that stdout + of the adb process is a file handle on the host and + the exit code is available as the exit code of the adb + process. + + The caller provides a list containing commands, as well as a + timeout period in seconds. + + A subprocess is spawned to execute adb with stdout and stderr + directed to a temporary file. If the process takes longer than + the specified timeout, the process is terminated. + + It is the caller's responsibilty to clean up by closing + the stdout temporary file. + """ + return ADBCommand.command(self, cmds, timeout=timeout) + + def command_output(self, cmds, timeout=None): + """Executes an adb command on the host returning stdout. + + :param list cmds: The command and its arguments to be + executed. + :param int timeout: The maximum time in seconds + for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBHost constructor is used. + :return: str - content of stdout. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + return ADBCommand.command_output(self, cmds, timeout=timeout) + + def start_server(self, timeout=None): + """Starts the adb server. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBHost constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + + Attempting to use start_server with any adb_host value other than None + will fail with an ADBError exception. + + You will need to start the server on the remote host via the command: + + .. code-block:: shell + + adb -a fork-server server + + If you wish the remote adb server to restart automatically, you can + enclose the command in a loop as in: + + .. code-block:: shell + + while true; do + adb -a fork-server server + done + """ + self.command_output(["start-server"], timeout=timeout) + + def kill_server(self, timeout=None): + """Kills the adb server. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBHost constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + self.command_output(["kill-server"], timeout=timeout) + + def devices(self, timeout=None): + """Executes adb devices -l and returns a list of objects describing attached devices. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBHost constructor is used. + :return: an object contain + :raises: :exc:`ADBTimeoutError` + :exc:`ADBListDevicesError` + :exc:`ADBError` + + The output of adb devices -l + + :: + + $ adb devices -l + List of devices attached + b313b945 device usb:1-7 product:d2vzw model:SCH_I535 device:d2vzw + + is parsed and placed into an object as in + + :: + + [{'device_serial': 'b313b945', 'state': 'device', 'product': 'd2vzw', + 'usb': '1-7', 'device': 'd2vzw', 'model': 'SCH_I535' }] + """ + # b313b945 device usb:1-7 product:d2vzw model:SCH_I535 device:d2vzw + # from Android system/core/adb/transport.c statename() + re_device_info = re.compile( + r"([^\s]+)\s+(offline|bootloader|device|host|recovery|sideload|" + "no permissions|unauthorized|unknown)" + ) + devices = [] + lines = self.command_output(["devices", "-l"], timeout=timeout).splitlines() + for line in lines: + if line == "List of devices attached ": + continue + match = re_device_info.match(line) + if match: + device = {"device_serial": match.group(1), "state": match.group(2)} + remainder = line[match.end(2) :].strip() + if remainder: + try: + device.update( + dict([j.split(":") for j in remainder.split(" ")]) + ) + except ValueError: + self._logger.warning( + "devices: Unable to parse " "remainder for device %s" % line + ) + devices.append(device) + for device in devices: + if device["state"] == "no permissions": + raise ADBListDevicesError( + "No permissions to detect devices. You should restart the" + " adb server as root:\n" + "\n# adb kill-server\n# adb start-server\n" + "\nor maybe configure your udev rules.", + devices, + ) + return devices + + +ADBDEVICES = {} + + +def ADBDeviceFactory( + device=None, + adb="adb", + adb_host=None, + adb_port=None, + test_root=None, + logger_name="adb", + timeout=300, + verbose=False, + device_ready_retry_wait=20, + device_ready_retry_attempts=3, + use_root=True, + share_test_root=True, + run_as_package=None, +): + """ADBDeviceFactory provides a factory for :class:`ADBDevice` + instances that enforces the requirement that only one + :class:`ADBDevice` be created for each attached device. It uses + the identical arguments as the :class:`ADBDevice` + constructor. This is also used to ensure that the device's + test_root is initialized to an empty directory before tests are + run on the device. + + :return: :class:`ADBDevice` + :raises: :exc:`ADBDeviceFactoryError` + :exc:`ADBError` + :exc:`ADBTimeoutError` + + """ + device = device or os.environ.get("ANDROID_SERIAL") + if device is not None and device in ADBDEVICES: + # We have already created an ADBDevice for this device, just re-use it. + adbdevice = ADBDEVICES[device] + elif device is None and ADBDEVICES: + # We did not specify the device serial number and we have + # already created an ADBDevice which means we must only have + # one device connected and we can re-use the existing ADBDevice. + devices = list(ADBDEVICES.keys()) + assert ( + len(devices) == 1 + ), "Only one device may be connected if the device serial number is not specified." + adbdevice = ADBDEVICES[devices[0]] + elif ( + device is not None + and device not in ADBDEVICES + or device is None + and not ADBDEVICES + ): + # The device has not had an ADBDevice created yet. + adbdevice = ADBDevice( + device=device, + adb=adb, + adb_host=adb_host, + adb_port=adb_port, + test_root=test_root, + logger_name=logger_name, + timeout=timeout, + verbose=verbose, + device_ready_retry_wait=device_ready_retry_wait, + device_ready_retry_attempts=device_ready_retry_attempts, + use_root=use_root, + share_test_root=share_test_root, + run_as_package=run_as_package, + ) + ADBDEVICES[adbdevice._device_serial] = adbdevice + else: + raise ADBDeviceFactoryError( + "Inconsistent ADBDeviceFactory: device: %s, ADBDEVICES: %s" + % (device, ADBDEVICES) + ) + # Clean the test root before testing begins. + if test_root: + adbdevice.rm( + posixpath.join(adbdevice.test_root, "*"), + recursive=True, + force=True, + timeout=timeout, + ) + # Sync verbose and update the logger configuration in case it has + # changed since the initial initialization + if verbose != adbdevice._verbose: + adbdevice._verbose = verbose + adbdevice._logger = adbdevice._get_logger(adbdevice._logger.name, verbose) + return adbdevice + + +class ADBDevice(ADBCommand): + """ADBDevice provides methods which can be used to interact with the + associated Android-based device. + + :param str device: When a string is passed in device, it + is interpreted as the device serial number. This form is not + compatible with devices containing a ":" in the serial; in + this case ValueError will be raised. When a dictionary is + passed it must have one or both of the keys "device_serial" + and "usb". This is compatible with the dictionaries in the + list returned by ADBHost.devices(). If the value of + device_serial is a valid serial not containing a ":" it will + be used to identify the device, otherwise the value of the usb + key, prefixed with "usb:" is used. If None is passed and + there is exactly one device attached to the host, that device + is used. If None is passed and ANDROID_SERIAL is set in the environment, + that device is used. If there is more than one device attached and + device is None and ANDROID_SERIAL is not set in the environment, ValueError + is raised. If no device is attached the constructor will block + until a device is attached or the timeout is reached. + :param str adb_host: host of the adb server to connect to. + :param int adb_port: port of the adb server to connect to. + :param str test_root: value containing the test root to be + used on the device. This value will be shared among all + instances of ADBDevice if share_test_root is True. + :param str logger_name: logging logger name. Defaults to 'adb' + :param int timeout: The default maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value defaults to 300. + :param bool verbose: provide verbose output + :param int device_ready_retry_wait: number of seconds to wait + between attempts to check if the device is ready after a + reboot. + :param integer device_ready_retry_attempts: number of attempts when + checking if a device is ready. + :param bool use_root: Use root if it is available on device + :param bool share_test_root: True if instance should share the + same test_root value with other ADBInstances. Defaults to True. + :param str run_as_package: Name of package to be used in run-as in liew of + using su. + :raises: :exc:`ADBError` + :exc:`ADBTimeoutError` + :exc:`ValueError` + + :: + + from mozdevice import ADBDevice + + adbdevice = ADBDevice() + print(adbdevice.list_files("/mnt/sdcard")) + if adbdevice.process_exist("org.mozilla.geckoview.test_runner"): + print("org.mozilla.geckoview.test_runner is running") + """ + + SOCKET_DIRECTION_REVERSE = "reverse" + + SOCKET_DIRECTION_FORWARD = "forward" + + # BUILTINS is used to determine which commands can not be executed + # via su or run-as. This set of possible builtin commands was + # obtained from `man builtin` on Linux. + BUILTINS = set( + [ + "alias", + "bg", + "bind", + "break", + "builtin", + "caller", + "cd", + "command", + "compgen", + "complete", + "compopt", + "continue", + "declare", + "dirs", + "disown", + "echo", + "enable", + "eval", + "exec", + "exit", + "export", + "false", + "fc", + "fg", + "getopts", + "hash", + "help", + "history", + "jobs", + "kill", + "let", + "local", + "logout", + "mapfile", + "popd", + "printf", + "pushd", + "pwd", + "read", + "readonly", + "return", + "set", + "shift", + "shopt", + "source", + "suspend", + "test", + "times", + "trap", + "true", + "type", + "typeset", + "ulimit", + "umask", + "unalias", + "unset", + "wait", + ] + ) + + def __init__( + self, + device=None, + adb="adb", + adb_host=None, + adb_port=None, + test_root=None, + logger_name="adb", + timeout=300, + verbose=False, + device_ready_retry_wait=20, + device_ready_retry_attempts=3, + use_root=True, + share_test_root=True, + run_as_package=None, + ): + global _TEST_ROOT + + ADBCommand.__init__( + self, + adb=adb, + adb_host=adb_host, + adb_port=adb_port, + logger_name=logger_name, + timeout=timeout, + verbose=verbose, + use_root=use_root, + ) + self._logger.info("Using adb %s" % self._adb_version) + self._device_serial = self._get_device_serial(device) + self._initial_test_root = test_root + self._share_test_root = share_test_root + if share_test_root and not _TEST_ROOT: + _TEST_ROOT = test_root + self._test_root = None + self._run_as_package = None + # Cache packages debuggable state. + self._debuggable_packages = {} + self._device_ready_retry_wait = device_ready_retry_wait + self._device_ready_retry_attempts = device_ready_retry_attempts + self._have_root_shell = False + self._have_su = False + self._have_android_su = False + self._selinux = None + self._re_internal_storage = None + + self._wait_for_boot_completed(timeout=timeout) + + # Record the start time of the ADBDevice initialization so we can + # determine if we should abort with an ADBTimeoutError if it is + # taking too long. + start_time = time.time() + + # Attempt to get the Android version as early as possible in order + # to work around differences in determining adb command exit codes + # in Android before and after Android 7. + self.version = 0 + while self.version < 1 and (time.time() - start_time) <= float(timeout): + try: + version = self.get_prop("ro.build.version.sdk", timeout=timeout) + self.version = int(version) + except ValueError: + self._logger.info("unexpected ro.build.version.sdk: '%s'" % version) + time.sleep(2) + if self.version < 1: + # note slightly different meaning to the ADBTimeoutError here (and above): + # failed to get valid (numeric) version string in all attempts in allowed time + raise ADBTimeoutError( + "ADBDevice: unable to determine ro.build.version.sdk." + ) + + self._mkdir_p = None + # Force the use of /system/bin/ls or /system/xbin/ls in case + # there is /sbin/ls which embeds ansi escape codes to colorize + # the output. Detect if we are using busybox ls. We want each + # entry on a single line and we don't want . or .. + ls_dir = "/system" + + # Using self.is_file is problematic on emulators either + # using ls or test to check for their existence. + # Executing the command to detect its existence works around + # any issues with ls or test. + boot_completed = False + while not boot_completed and (time.time() - start_time) <= float(timeout): + try: + self.shell_output("/system/bin/ls /system/bin/ls", timeout=timeout) + boot_completed = True + self._ls = "/system/bin/ls" + except ADBError as e1: + self._logger.debug("detect /system/bin/ls {}".format(e1)) + try: + self.shell_output( + "/system/xbin/ls /system/xbin/ls", timeout=timeout + ) + boot_completed = True + self._ls = "/system/xbin/ls" + except ADBError as e2: + self._logger.debug("detect /system/xbin/ls : {}".format(e2)) + if not boot_completed: + time.sleep(2) + if not boot_completed: + raise ADBError("ADBDevice.__init__: ls could not be found") + + # A race condition can occur especially with emulators where + # the device appears to be available but it has not completed + # mounting the sdcard. We can work around this by checking if + # the sdcard is missing when we attempt to ls it and retrying + # if it is not yet available. + boot_completed = False + while not boot_completed and (time.time() - start_time) <= float(timeout): + try: + self.shell_output("{} -1A {}".format(self._ls, ls_dir), timeout=timeout) + boot_completed = True + self._ls += " -1A" + except ADBError as e: + self._logger.debug("detect ls -1A: {}".format(e)) + if "No such file or directory" not in str(e): + boot_completed = True + self._ls += " -a" + if not boot_completed: + time.sleep(2) + if not boot_completed: + raise ADBTimeoutError("ADBDevice: /sdcard not found.") + + self._logger.info("%s supported" % self._ls) + + # builtin commands which do not exist as separate programs can + # not be executed using su or run-as. Remove builtin commands + # from self.BUILTINS which also exist as separate programs so + # that we will be able to execute them using su or run-as if + # necessary. + remove_builtins = set() + for builtin in self.BUILTINS: + try: + self.ls("/system/*bin/%s" % builtin, timeout=timeout) + self._logger.debug("Removing %s from BUILTINS" % builtin) + remove_builtins.add(builtin) + except ADBError: + pass + self.BUILTINS.difference_update(remove_builtins) + + # Do we have cp? + boot_completed = False + while not boot_completed and (time.time() - start_time) <= float(timeout): + try: + self.shell_output("cp --help", timeout=timeout) + boot_completed = True + self._have_cp = True + except ADBError as e: + if "not found" in str(e): + self._have_cp = False + boot_completed = True + elif "known option" in str(e): + self._have_cp = True + boot_completed = True + elif "invalid option" in str(e): + self._have_cp = True + boot_completed = True + if not boot_completed: + time.sleep(2) + if not boot_completed: + raise ADBTimeoutError("ADBDevice: cp not found.") + self._logger.info("Native cp support: %s" % self._have_cp) + + # Do we have chmod -R? + try: + self._chmod_R = False + re_recurse = re.compile(r"[-]R") + chmod_output = self.shell_output("chmod --help", timeout=timeout) + match = re_recurse.search(chmod_output) + if match: + self._chmod_R = True + except ADBError as e: + self._logger.debug("Check chmod -R: {}".format(e)) + match = re_recurse.search(str(e)) + if match: + self._chmod_R = True + self._logger.info("Native chmod -R support: {}".format(self._chmod_R)) + + # Do we have chown -R? + try: + self._chown_R = False + chown_output = self.shell_output("chown --help", timeout=timeout) + match = re_recurse.search(chown_output) + if match: + self._chown_R = True + except ADBError as e: + self._logger.debug("Check chown -R: {}".format(e)) + self._logger.info("Native chown -R support: {}".format(self._chown_R)) + + try: + cleared = self.shell_bool('logcat -P ""', timeout=timeout) + except ADBError: + cleared = False + if not cleared: + self._logger.info("Unable to turn off logcat chatty") + + # Do we have pidof? + if self.version < version_codes.N: + # unexpected pidof behavior observed on Android 6 in bug 1514363 + self._have_pidof = False + else: + boot_completed = False + while not boot_completed and (time.time() - start_time) <= float(timeout): + try: + self.shell_output("pidof --help", timeout=timeout) + boot_completed = True + self._have_pidof = True + except ADBError as e: + if "not found" in str(e): + self._have_pidof = False + boot_completed = True + elif "known option" in str(e): + self._have_pidof = True + boot_completed = True + if not boot_completed: + time.sleep(2) + if not boot_completed: + raise ADBTimeoutError("ADBDevice: pidof not found.") + # Bug 1529960 observed pidof intermittently returning no results for a + # running process on the 7.0 x86_64 emulator. + + characteristics = self.get_prop("ro.build.characteristics", timeout=timeout) + + abi = self.get_prop("ro.product.cpu.abi", timeout=timeout) + self._have_flaky_pidof = ( + self.version == version_codes.N + and abi == "x86_64" + and "emulator" in characteristics + ) + self._logger.info( + "Native {} pidof support: {}".format( + "flaky" if self._have_flaky_pidof else "normal", self._have_pidof + ) + ) + + if self._use_root: + # Detect if root is available, but do not fail if it is not. + # Catch exceptions due to the potential for segfaults + # calling su when using an improperly rooted device. + + self._check_adb_root(timeout=timeout) + + if not self._have_root_shell: + # To work around bug 1525401 where su -c id will return an + # exitcode of 1 if selinux permissive is not already in effect, + # we need su to turn off selinux prior to checking for su. + # We can use shell() directly to prevent the non-zero exitcode + # from raising an ADBError. + # Note: We are assuming su -c is supported and do not attempt to + # use su 0. + adb_process = self.shell("su -c setenforce 0") + self._logger.info( + "su -c setenforce 0 exitcode %s, stdout: %s" + % (adb_process.proc.poll(), adb_process.proc.stdout) + ) + + uid = "uid=0" + # Do we have a 'Superuser' sh like su? + try: + if self.shell_output("su -c id", timeout=timeout).find(uid) != -1: + self._have_su = True + self._logger.info("su -c supported") + except ADBError as e: + self._logger.debug("Check for su -c failed: {}".format(e)) + + # Check if Android's su 0 command works. + # su 0 id will hang on Pixel 2 8.1.0/OPM2.171019.029.B1/4720900 + # rooted via magisk. If we already have detected su -c support, + # we can skip this check. + try: + if ( + not self._have_su + and self.shell_output("su 0 id", timeout=timeout).find(uid) + != -1 + ): + self._have_android_su = True + self._logger.info("su 0 supported") + except ADBError as e: + self._logger.debug("Check for su 0 failed: {}".format(e)) + + # Guarantee that /data/local/tmp exists and is accessible to all. + # It is a fatal error if /data/local/tmp does not exist and can not be created. + if not self.exists("/data/local/tmp", timeout=timeout): + # parents=True is required on emulator, where exist() may be flaky + self.mkdir("/data/local/tmp", parents=True, timeout=timeout) + + # Beginning in Android 8.1 /data/anr/traces.txt no longer contains + # a single file traces.txt but instead will contain individual files + # for each stack. + # See https://github.com/aosp-mirror/platform_build/commit/ + # fbba7fe06312241c7eb8c592ec2ac630e4316d55 + stack_trace_dir = self.shell_output( + "getprop dalvik.vm.stack-trace-dir", timeout=timeout + ) + if not stack_trace_dir: + stack_trace_file = self.shell_output( + "getprop dalvik.vm.stack-trace-file", timeout=timeout + ) + if stack_trace_file: + stack_trace_dir = posixpath.dirname(stack_trace_file) + else: + stack_trace_dir = "/data/anr" + self.stack_trace_dir = stack_trace_dir + self.enforcing = "Permissive" + self.run_as_package = run_as_package + + self._logger.debug("ADBDevice: %s" % self.__dict__) + + @property + def is_rooted(self): + return self._have_root_shell or self._have_su or self._have_android_su + + def _wait_for_boot_completed(self, timeout=None): + """Internal method to wait for boot to complete. + + Wait for sys.boot_completed=1 and raise ADBError if boot does + not complete within retry attempts. + + :param int timeout: The default maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value defaults to 300. + :raises: :exc:`ADBError` + """ + for attempt in range(self._device_ready_retry_attempts): + sys_boot_completed = self.shell_output( + "getprop sys.boot_completed", timeout=timeout + ) + if sys_boot_completed == "1": + break + time.sleep(self._device_ready_retry_wait) + if sys_boot_completed != "1": + raise ADBError("Failed to complete boot in time") + + def _get_device_serial(self, device): + device = device or os.environ.get("ANDROID_SERIAL") + if device is None: + devices = ADBHost( + adb=self._adb_path, adb_host=self._adb_host, adb_port=self._adb_port + ).devices() + if len(devices) > 1: + raise ValueError( + "ADBDevice called with multiple devices " + "attached and no device specified" + ) + if len(devices) == 0: + raise ADBError("No connected devices found.") + device = devices[0] + + # Allow : in device serial if it matches a tcpip device serial. + re_device_serial_tcpip = re.compile(r"[^:]+:[0-9]+$") + + def is_valid_serial(serial): + return ( + serial.startswith("usb:") + or re_device_serial_tcpip.match(serial) is not None + or ":" not in serial + ) + + if isinstance(device, six.string_types): + # Treat this as a device serial + if not is_valid_serial(device): + raise ValueError( + "Device serials containing ':' characters are " + "invalid. Pass the output from " + "ADBHost.devices() for the device instead" + ) + return device + + serial = device.get("device_serial") + if serial is not None and is_valid_serial(serial): + return serial + usb = device.get("usb") + if usb is not None: + return "usb:%s" % usb + + raise ValueError("Unable to get device serial") + + def _check_root_user(self, timeout=None): + uid = "uid=0" + # Is shell already running as root? + try: + if self.shell_output("id", timeout=timeout).find(uid) != -1: + self._logger.info("adbd running as root") + return True + except ADBError: + self._logger.debug("Check for root user failed") + return False + + def _check_adb_root(self, timeout=None): + self._have_root_shell = self._check_root_user(timeout=timeout) + + # Exclude these devices from checking for a root shell due to + # potential hangs. + exclude_set = set() + exclude_set.add("E5823") # Sony Xperia Z5 Compact (E5823) + # Do we need to run adb root to get a root shell? + if not self._have_root_shell: + if self.get_prop("ro.product.model") in exclude_set: + self._logger.warning( + "your device was excluded from attempting adb root." + ) + else: + try: + self.command_output(["root"], timeout=timeout) + self._have_root_shell = self._check_root_user(timeout=timeout) + if self._have_root_shell: + self._logger.info("adbd restarted as root") + else: + self._logger.info("adbd not restarted as root") + except ADBError: + self._logger.debug("Check for root adbd failed") + + def pidof(self, app_name, timeout=None): + """ + Return a list of pids for all extant processes running within the + specified application package. + + :param str app_name: The name of the application package to examine + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per + adb call. The total time spent may exceed this + value. If it is not specified, the value set + in the ADBDevice constructor is used. + :return: List of integers containing the pid(s) of the various processes. + :raises: :exc:`ADBTimeoutError` + """ + if self._have_pidof: + try: + pid_output = self.shell_output("pidof %s" % app_name, timeout=timeout) + re_pids = re.compile(r"[0-9]+") + pids = re_pids.findall(pid_output) + if self._have_flaky_pidof and not pids: + time.sleep(0.1) + pid_output = self.shell_output( + "pidof %s" % app_name, timeout=timeout + ) + pids = re_pids.findall(pid_output) + except ADBError: + pids = [] + else: + procs = self.get_process_list(timeout=timeout) + # limit the comparion to the first 75 characters due to a + # limitation in processname length in android. + pids = [proc[0] for proc in procs if proc[1] == app_name[:75]] + + return [int(pid) for pid in pids] + + def _sync(self, timeout=None): + """Sync the file system using shell_output in order that exceptions + are raised to the caller.""" + self.shell_output("sync", timeout=timeout) + + @staticmethod + def _should_quote(arg): + """Utility function if command argument should be quoted.""" + if not arg: + return False + if arg[0] == "'" and arg[-1] == "'" or arg[0] == '"' and arg[-1] == '"': + # Already quoted + return False + re_quotable_chars = re.compile(r"[ ()\"&'\];]") + return re_quotable_chars.search(arg) + + @staticmethod + def _quote(arg): + """Utility function to return quoted version of command argument.""" + if hasattr(shlex, "quote"): + quote = shlex.quote + elif hasattr(pipes, "quote"): + quote = pipes.quote + else: + + def quote(arg): + arg = arg or "" + re_unsafe = re.compile(r"[^\w@%+=:,./-]") + if re_unsafe.search(arg): + arg = "'" + arg.replace("'", "'\"'\"'") + "'" + return arg + + return quote(arg) + + @staticmethod + def _escape_command_line(cmds): + """Utility function which takes a list of command arguments and returns + escaped and quoted version of the command as a string. + """ + assert isinstance(cmds, list) + # This is identical to shlex.join in Python 3.8. We can + # replace it should we ever get Python 3.8 as a minimum. + quoted_cmd = " ".join([ADBDevice._quote(arg) for arg in cmds]) + + return quoted_cmd + + @staticmethod + def _get_exitcode(file_obj): + """Get the exitcode from the last line of the file_obj for shell + commands executed on Android prior to Android 7. + """ + re_returncode = re.compile(r"adb_returncode=([0-9]+)") + file_obj.seek(0, os.SEEK_END) + + line = "" + length = file_obj.tell() + offset = 1 + while length - offset >= 0: + file_obj.seek(-offset, os.SEEK_END) + char = six.ensure_str(file_obj.read(1)) + if not char: + break + if char != "\r" and char != "\n": + line = char + line + elif line: + # we have collected everything up to the beginning of the line + break + offset += 1 + match = re_returncode.match(line) + if match: + exitcode = int(match.group(1)) + # Set the position in the file to the position of the + # adb_returncode and truncate it from the output. + file_obj.seek(-1, os.SEEK_CUR) + file_obj.truncate() + else: + exitcode = None + # We may have a situation where the adb_returncode= is not + # at the end of the output. This happens at least in the + # failure jit-tests on arm. To work around this + # possibility, we can search the entire output for the + # appropriate match. + file_obj.seek(0, os.SEEK_SET) + for line in file_obj: + line = six.ensure_str(line) + match = re_returncode.search(line) + if match: + exitcode = int(match.group(1)) + break + # Reset the position in the file to the end. + file_obj.seek(0, os.SEEK_END) + + return exitcode + + def is_path_internal_storage(self, path, timeout=None): + """ + Return True if the path matches an internal storage path + as defined by either '/sdcard', '/mnt/sdcard', or any of the + .*_STORAGE environment variables on the device otherwise False. + + :param str path: The path to test. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBDevice constructor is used. + :return: boolean + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if not self._re_internal_storage: + storage_dirs = set(["/mnt/sdcard", "/sdcard"]) + re_STORAGE = re.compile("([^=]+STORAGE)=(.*)") + lines = self.shell_output("set", timeout=timeout).split() + for line in lines: + m = re_STORAGE.match(line.strip()) + if m and m.group(2): + storage_dirs.add(m.group(2)) + self._re_internal_storage = re.compile("/|".join(list(storage_dirs)) + "/") + return self._re_internal_storage.match(path) is not None + + def is_package_debuggable(self, package): + if not package: + return False + + if not self.is_app_installed(package): + self._logger.warning( + "Can not check if package %s is debuggable as it is not installed." + % package + ) + return False + + if package in self._debuggable_packages: + return self._debuggable_packages[package] + + try: + self.shell_output("run-as %s ls /system" % package) + self._debuggable_packages[package] = True + except ADBError as e: + self._debuggable_packages[package] = False + self._logger.warning("Package %s is not debuggable: %s" % (package, str(e))) + return self._debuggable_packages[package] + + @property + def package_dir(self): + if not self._run_as_package: + return None + # If we have a debuggable app and can use its directory to + # locate the test_root, this returns the location of the app's + # directory. If it is not located in the default location this + # will not be correct. + return "/data/data/%s" % self._run_as_package + + @property + def run_as_package(self): + """Returns the name of the package which will be used in run-as to change + the effective user executing a command.""" + return self._run_as_package + + @run_as_package.setter + def run_as_package(self, value): + if self._have_root_shell or self._have_su or self._have_android_su: + # When we have root available, use that instead of run-as. + return + + if self._run_as_package == value: + # Do nothing if the value doesn't change. + return + + if not value: + if self._test_root: + # Make sure the old test_root is clean without using + # the test_root property getter. + self.rm( + posixpath.join(self._test_root, "*"), recursive=True, force=True + ) + self._logger.info( + "Setting run_as_package to None. Resetting test root from %s to %s" + % (self._test_root, self._initial_test_root) + ) + self._run_as_package = None + # We must set _run_as_package to None before assigning to + # self.test_root in order to prevent attempts to use + # run-as. + self.test_root = self._initial_test_root + if self._test_root: + # Make sure the new test_root is clean. + self.rm( + posixpath.join(self._test_root, "*"), recursive=True, force=True + ) + return + + if not self.is_package_debuggable(value): + self._logger.warning( + "Can not set run_as_package to %s since it is not debuggable." % value + ) + # Since we are attempting to set run_as_package assume + # that we are not rooted and do not include + # /data/local/tmp as an option when checking for possible + # test_root paths using external storage. + paths = [ + "/storage/emulated/0/Android/data/%s/test_root" % value, + "/sdcard/test_root", + "/mnt/sdcard/test_root", + ] + self._try_test_root_candidates(paths) + return + + # Require these devices to have Verify bytecode turned off due to failures with run-as. + include_set = set() + include_set.add("SM-G973F") # Samsung S10g SM-G973F + + if ( + self.get_prop("ro.product.model") in include_set + and self.shell_output("settings get global art_verifier_verify_debuggable") + == "1" + ): + self._logger.warning( + """Your device has Verify bytecode of debuggable apps set which + causes problems attempting to use run-as to delegate command execution to debuggable + apps. You must turn this setting off in Developer options on your device. + """ + ) + raise ADBError( + "Verify bytecode of debuggable apps must be turned off to use run-as" + ) + + self._logger.info("Setting run_as_package to %s" % value) + + self._run_as_package = value + old_test_root = self._test_root + new_test_root = posixpath.join(self.package_dir, "test_root") + if old_test_root != new_test_root: + try: + # Make sure the old test_root is clean. + if old_test_root: + self.rm( + posixpath.join(old_test_root, "*"), recursive=True, force=True + ) + self.test_root = posixpath.join(self.package_dir, "test_root") + # Make sure the new test_root is clean. + self.rm(posixpath.join(self.test_root, "*"), recursive=True, force=True) + except ADBError as e: + # There was a problem using run-as to initialize + # the new test_root in the app's directory. + # Restore the old test root and raise an ADBError. + self._run_as_package = None + self.test_root = old_test_root + self._logger.warning( + "Exception %s setting test_root to %s. " + "Resetting test_root to %s." + % (str(e), new_test_root, old_test_root) + ) + raise ADBError( + "Unable to initialize test root while setting run_as_package %s" + % value + ) + + def enable_run_as_for_path(self, path): + return self._run_as_package is not None and path.startswith(self.package_dir) + + @property + def test_root(self): + """ + The test_root property returns the directory on the device where + temporary test files are stored. + + The first time test_root it is called it determines and caches a value + for the test root on the device. It determines the appropriate test + root by attempting to create a 'proof' directory on each of a list of + directories and returning the first successful directory as the + test_root value. The cached value for the test_root will be shared + by subsequent instances of ADBDevice if self._share_test_root is True. + + The default list of directories checked by test_root are: + + If the device is rooted: + - /data/local/tmp/test_root + + If run_as_package is not available and the device is not rooted: + + - /data/local/tmp/test_root + - /sdcard/test_root + - /storage/sdcard/test_root + - /mnt/sdcard/test_root + + You may override the default list by providing a test_root argument to + the :class:`ADBDevice` constructor which will then be used when + attempting to create the 'proof' directory. + + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if self._test_root is not None: + self._logger.debug("Using cached test_root %s" % self._test_root) + return self._test_root + + if self.run_as_package is not None: + raise ADBError( + "run_as_package is %s however test_root is None" % self.run_as_package + ) + + if self._share_test_root and _TEST_ROOT: + self._logger.debug( + "Attempting to use shared test_root %s" % self._test_root + ) + paths = [_TEST_ROOT] + elif self._initial_test_root is not None: + self._logger.debug( + "Attempting to use initial test_root %s" % self._test_root + ) + paths = [self._initial_test_root] + else: + # Android 10's scoped storage means we can no longer + # reliably host profiles and tests on the sdcard though it + # depends on the device. See + # https://developer.android.com/training/data-storage#scoped-storage + # Also see RunProgram in + # python/mozbuild/mozbuild/mach_commands.py where they + # choose /data/local/tmp as the default location for the + # profile because GeckoView only takes its configuration + # file from /data/local/tmp. Since we have not specified + # a run_as_package yet, assume we may be attempting to use + # a shell program which creates files owned by the shell + # user and which would work using /data/local/tmp/ even if + # the device is not rooted. Fall back to external storage + # if /data/local/tmp is not available. + paths = ["/data/local/tmp/test_root"] + if not self.is_rooted: + # Note that /sdcard may be accessible while + # /mnt/sdcard is not. + paths.extend( + [ + "/sdcard/test_root", + "/storage/sdcard/test_root", + "/mnt/sdcard/test_root", + ] + ) + + return self._try_test_root_candidates(paths) + + @test_root.setter + def test_root(self, value): + # Cache the requested test root so that + # other invocations of ADBDevice will pick + # up the same value. + global _TEST_ROOT + if self._test_root == value: + return + self._logger.debug("Setting test_root from %s to %s" % (self._test_root, value)) + old_test_root = self._test_root + self._test_root = value + if self._share_test_root: + _TEST_ROOT = value + if not value: + return + if not self._try_test_root(value): + self._test_root = old_test_root + raise ADBError("Unable to set test_root to %s" % value) + readme = posixpath.join(value, "README") + if not self.is_file(readme): + tmpf = tempfile.NamedTemporaryFile(mode="w", delete=False) + tmpf.write( + "This directory is used by mozdevice to contain all content " + "related to running tests on this device.\n" + ) + tmpf.close() + try: + self.push(tmpf.name, readme) + finally: + if tmpf: + os.unlink(tmpf.name) + + def _try_test_root_candidates(self, paths): + max_attempts = 3 + for test_root in paths: + for attempt in range(1, max_attempts + 1): + self._logger.debug( + "Setting test root to %s attempt %d of %d" + % (test_root, attempt, max_attempts) + ) + + if self._try_test_root(test_root): + if not self._test_root: + # Cache the detected test_root so that we can + # restore the value without having re-run + # _try_test_root. + self._initial_test_root = test_root + self._test_root = test_root + self._logger.info("Setting test_root to %s" % self._test_root) + return self._test_root + + self._logger.debug( + "_setup_test_root: " + "Attempt %d of %d failed to set test_root to %s" + % (attempt, max_attempts, test_root) + ) + + if attempt != max_attempts: + time.sleep(20) + + raise ADBError( + "Unable to set up test root using paths: [%s]" % ", ".join(paths) + ) + + def _try_test_root(self, test_root): + try: + if not self.is_dir(test_root): + self.mkdir(test_root, parents=True) + proof_dir = posixpath.join(test_root, "proof") + if self.is_dir(proof_dir): + self.rm(proof_dir, recursive=True) + self.mkdir(proof_dir) + self.rm(proof_dir, recursive=True) + except ADBError as e: + self._logger.warning("%s is not writable: %s" % (test_root, str(e))) + return False + + return True + + # Host Command methods + + def command(self, cmds, timeout=None): + """Executes an adb command on the host against the device. + + :param list cmds: The command and its arguments to be + executed. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBDevice constructor is used. + :return: :class:`ADBProcess` + + command() provides a low level interface for executing + commands for a specific device on the host via adb. + + command() executes on the host in such a fashion that stdout + of the adb process are file handles on the host and + the exit code is available as the exit code of the adb + process. + + For executing shell commands on the device, use + ADBDevice.shell(). The caller provides a list containing + commands, as well as a timeout period in seconds. + + A subprocess is spawned to execute adb for the device with + stdout and stderr directed to a temporary file. If the process + takes longer than the specified timeout, the process is + terminated. + + It is the caller's responsibilty to clean up by closing + the stdout temporary file. + """ + + return ADBCommand.command( + self, cmds, device_serial=self._device_serial, timeout=timeout + ) + + def command_output(self, cmds, timeout=None): + """Executes an adb command on the host against the device returning + stdout. + + :param list cmds: The command and its arguments to be executed. + :param int timeout: The maximum time in seconds + for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: str - content of stdout. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + return ADBCommand.command_output( + self, cmds, device_serial=self._device_serial, timeout=timeout + ) + + # Networking methods + + def _validate_port(self, port, is_local=True): + """Validate a port forwarding specifier. Raises ValueError on failure. + + :param str port: The port specifier to validate + :param bool is_local: Flag indicating whether the port represents a local port. + """ + prefixes = ["tcp", "localabstract", "localreserved", "localfilesystem", "dev"] + + if not is_local: + prefixes += ["jdwp"] + + parts = port.split(":", 1) + if len(parts) != 2 or parts[0] not in prefixes: + raise ValueError("Invalid port specifier %s" % port) + + def _validate_direction(self, direction): + """Validate direction of the socket connection. Raises ValueError on failure. + + :param str direction: The socket direction specifier to validate + :raises: :exc:`ValueError` + """ + if direction not in [ + self.SOCKET_DIRECTION_FORWARD, + self.SOCKET_DIRECTION_REVERSE, + ]: + raise ValueError("Invalid direction specifier {}".format(direction)) + + def create_socket_connection( + self, direction, local, remote, allow_rebind=True, timeout=None + ): + """Sets up a socket connection in the specified direction. + + :param str direction: Direction of the socket connection + :param str local: Local port + :param str remote: Remote port + :param bool allow_rebind: Do not fail if port is already bound + :param int timeout: The maximum time in seconds + for any spawned adb process to complete before throwing + an ADBTimeoutError. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: When forwarding from "tcp:0", an int containing the port number + of the local port assigned by adb, otherwise None. + :raises: :exc:`ValueError` + :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + # validate socket direction, and local and remote port formatting. + self._validate_direction(direction) + for port, is_local in [(local, True), (remote, False)]: + self._validate_port(port, is_local=is_local) + + cmd = [direction, local, remote] + + if not allow_rebind: + cmd.insert(1, "--no-rebind") + + # execute commands to establish socket connection. + cmd_output = self.command_output(cmd, timeout=timeout) + + # If we want to forward using local port "tcp:0", then we're letting + # adb assign the port for us, so we need to return that assignment. + if ( + direction == self.SOCKET_DIRECTION_FORWARD + and local == "tcp:0" + and cmd_output + ): + return int(cmd_output) + + return None + + def list_socket_connections(self, direction, timeout=None): + """Return a list of tuples specifying active socket connectionss. + + Return values are of the form (device, local, remote). + + :param str direction: 'forward' to list forward socket connections + 'reverse' to list reverse socket connections + :param int timeout: The maximum time in seconds + for any spawned adb process to complete before throwing + an ADBTimeoutError. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ValueError` + :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + self._validate_direction(direction) + + cmd = [direction, "--list"] + output = self.command_output(cmd, timeout=timeout) + return [tuple(line.split(" ")) for line in output.splitlines() if line.strip()] + + def remove_socket_connections(self, direction, local=None, timeout=None): + """Remove existing socket connections for a given direction. + + :param str direction: 'forward' to remove forward socket connection + 'reverse' to remove reverse socket connection + :param str local: local port specifier as for ADBDevice.forward. If local + is not specified removes all forwards. + :param int timeout: The maximum time in seconds + for any spawned adb process to complete before throwing + an ADBTimeoutError. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ValueError` + :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + self._validate_direction(direction) + + cmd = [direction] + + if local is None: + cmd.extend(["--remove-all"]) + else: + self._validate_port(local, is_local=True) + cmd.extend(["--remove", local]) + + self.command_output(cmd, timeout=timeout) + + # Legacy port forward methods + + def forward(self, local, remote, allow_rebind=True, timeout=None): + """Forward a local port to a specific port on the device. + + :return: When forwarding from "tcp:0", an int containing the port number + of the local port assigned by adb, otherwise None. + + See `ADBDevice.create_socket_connection`. + """ + return self.create_socket_connection( + self.SOCKET_DIRECTION_FORWARD, local, remote, allow_rebind, timeout + ) + + def list_forwards(self, timeout=None): + """Return a list of tuples specifying active forwards. + + See `ADBDevice.list_socket_connection`. + """ + return self.list_socket_connections(self.SOCKET_DIRECTION_FORWARD, timeout) + + def remove_forwards(self, local=None, timeout=None): + """Remove existing port forwards. + + See `ADBDevice.remove_socket_connection`. + """ + self.remove_socket_connections(self.SOCKET_DIRECTION_FORWARD, local, timeout) + + # Legacy port reverse methods + + def reverse(self, local, remote, allow_rebind=True, timeout=None): + """Sets up a reverse socket connection from device to host. + + See `ADBDevice.create_socket_connection`. + """ + self.create_socket_connection( + self.SOCKET_DIRECTION_REVERSE, local, remote, allow_rebind, timeout + ) + + def list_reverses(self, timeout=None): + """Returns a list of tuples showing active reverse socket connections. + + See `ADBDevice.list_socket_connection`. + """ + return self.list_socket_connections(self.SOCKET_DIRECTION_REVERSE, timeout) + + def remove_reverses(self, local=None, timeout=None): + """Remove existing reverse socket connections. + + See `ADBDevice.remove_socket_connection`. + """ + self.remove_socket_connections(self.SOCKET_DIRECTION_REVERSE, local, timeout) + + # Device Shell methods + + def shell( + self, + cmd, + env=None, + cwd=None, + timeout=None, + stdout_callback=None, + yield_stdout=None, + enable_run_as=False, + ): + """Executes a shell command on the device. + + :param str cmd: The command to be executed. + :param dict env: Contains the environment variables and + their values. + :param str cwd: The directory from which to execute. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBDevice constructor is used. + :param function stdout_callback: Function called for each line of output. + :param bool yield_stdout: Flag used to make the returned process + iteratable. The return process can be used in a loop to get the output + and the loop would exit as the process ends. + :param bool enable_run_as: Flag used to temporarily enable use + of run-as to execute the command. + :return: :class:`ADBProcess` + + shell() provides a low level interface for executing commands + on the device via adb shell. + + shell() executes on the host in such as fashion that stdout + contains the stdout and stderr of the host abd process + combined with the stdout and stderr of the shell command + on the device. The exit code of shell() is the exit code of + the adb command if it was non-zero or the extracted exit code + from the output of the shell command executed on the + device. + + The caller provides a flag indicating if the command is to be + executed as root, a string for any requested working + directory, a hash defining the environment, a string + containing shell commands, as well as a timeout period in + seconds. + + The command line to be executed is created to set the current + directory, set the required environment variables, optionally + execute the command using su and to output the return code of + the command to stdout. The command list is created as a + command sequence separated by && which will terminate the + command sequence on the first command which returns a non-zero + exit code. + + A subprocess is spawned to execute adb shell for the device + with stdout and stderr directed to a temporary file. If the + process takes longer than the specified timeout, the process + is terminated. The return code is extracted from the stdout + and is then removed from the file. + + It is the caller's responsibilty to clean up by closing + the stdout temporary files. + + If the yield_stdout flag is set, then the returned ADBProcess + can be iterated over to get the output as it is produced by + adb command. The iterator ends when the process timed out or + if it exited. This flag is incompatible with stdout_callback. + + """ + + def _timed_read_line_handler(signum, frame): + raise IOError("ReadLineTimeout") + + def _timed_read_line(filehandle, timeout=None): + """ + Attempt to readline from filehandle. If readline does not return + within timeout seconds, raise IOError('ReadLineTimeout'). + On Windows, required signal facilities are usually not available; + as a result, the timeout is not respected and some reads may + block on Windows. + """ + if not hasattr(signal, "SIGALRM"): + return filehandle.readline() + if timeout is None: + timeout = 5 + line = "" + default_alarm_handler = signal.getsignal(signal.SIGALRM) + signal.signal(signal.SIGALRM, _timed_read_line_handler) + signal.alarm(int(timeout)) + try: + line = filehandle.readline() + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, default_alarm_handler) + return line + + first_word = cmd.split(" ")[0] + if first_word in self.BUILTINS: + # Do not attempt to use su or run-as with builtin commands + pass + elif self._have_root_shell: + pass + elif self._have_android_su: + cmd = "su 0 %s" % cmd + elif self._have_su: + cmd = "su -c %s" % ADBDevice._quote(cmd) + elif self._run_as_package and enable_run_as: + cmd = "run-as %s %s" % (self._run_as_package, cmd) + else: + pass + + # prepend cwd and env to command if necessary + if cwd: + cmd = "cd %s && %s" % (cwd, cmd) + if env: + envstr = "&& ".join(["export %s=%s" % (x[0], x[1]) for x in env.items()]) + cmd = envstr + "&& " + cmd + # Before Android 7, an exitcode 0 for the process on the host + # did not mean that the exitcode of the Android process was + # also 0. We therefore used the echo adb_returncode=$? hack to + # obtain it there. However Android 7 and later intermittently + # do not emit the adb_returncode in stdout using this hack. In + # Android 7 and later the exitcode of the host process does + # match the exitcode of the Android process and we can use it + # directly. + if ( + self._device_serial.startswith("emulator") + or not hasattr(self, "version") + or self.version < version_codes.N + ): + cmd += "; echo adb_returncode=$?" + + args = [self._adb_path] + if self._adb_host: + args.extend(["-H", self._adb_host]) + if self._adb_port: + args.extend(["-P", str(self._adb_port)]) + if self._device_serial: + args.extend(["-s", self._device_serial]) + args.extend(["wait-for-device", "shell", cmd]) + + if timeout is None: + timeout = self._timeout + + if yield_stdout: + # When using yield_stdout, rely on the timeout implemented in + # ADBProcess instead of relying on our own here. + assert not stdout_callback + return ADBProcess(args, use_stdout_pipe=yield_stdout, timeout=timeout) + else: + adb_process = ADBProcess(args) + + start_time = time.time() + exitcode = adb_process.proc.poll() + if not stdout_callback: + while ((time.time() - start_time) <= float(timeout)) and exitcode is None: + time.sleep(self._polling_interval) + exitcode = adb_process.proc.poll() + else: + stdout2 = io.open(adb_process.stdout_file.name, "rb") + partial = b"" + while ((time.time() - start_time) <= float(timeout)) and exitcode is None: + try: + line = _timed_read_line(stdout2) + if line and len(line) > 0: + if line.endswith(b"\n") or line.endswith(b"\r"): + line = partial + line + partial = b"" + line = line.rstrip() + if self._verbose: + self._logger.info(six.ensure_str(line)) + stdout_callback(line) + else: + # no more output available now, but more to come? + partial = partial + line + else: + # no new output, so sleep and poll + time.sleep(self._polling_interval) + except IOError: + pass + exitcode = adb_process.proc.poll() + if exitcode is None: + adb_process.proc.kill() + adb_process.timedout = True + adb_process.exitcode = adb_process.proc.poll() + elif exitcode == 0: + if ( + not self._device_serial.startswith("emulator") + and hasattr(self, "version") + and self.version >= version_codes.N + ): + adb_process.exitcode = 0 + else: + adb_process.exitcode = self._get_exitcode(adb_process.stdout_file) + else: + adb_process.exitcode = exitcode + + if stdout_callback: + line = stdout2.readline() + while line: + if line.endswith(b"\n") or line.endswith(b"\r"): + line = partial + line + partial = b"" + stdout_callback(line.rstrip()) + else: + # no more output available now, but more to come? + partial = partial + line + line = stdout2.readline() + if partial: + stdout_callback(partial) + stdout2.close() + + adb_process.stdout_file.seek(0, os.SEEK_SET) + + return adb_process + + def shell_bool(self, cmd, env=None, cwd=None, timeout=None, enable_run_as=False): + """Executes a shell command on the device returning True on success + and False on failure. + + :param str cmd: The command to be executed. + :param dict env: Contains the environment variables and + their values. + :param str cwd: The directory from which to execute. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :param bool enable_run_as: Flag used to temporarily enable use + of run-as to execute the command. + :return: bool + :raises: :exc:`ADBTimeoutError` + """ + adb_process = None + try: + adb_process = self.shell( + cmd, env=env, cwd=cwd, timeout=timeout, enable_run_as=enable_run_as + ) + if adb_process.timedout: + raise ADBTimeoutError("%s" % adb_process) + return adb_process.exitcode == 0 + finally: + if adb_process: + if self._verbose: + output = adb_process.stdout + self._logger.debug( + "shell_bool: %s, " + "timeout: %s, " + "timedout: %s, " + "exitcode: %s, " + "output: %s" + % ( + " ".join(adb_process.args), + timeout, + adb_process.timedout, + adb_process.exitcode, + output, + ) + ) + + adb_process.stdout_file.close() + + def shell_output(self, cmd, env=None, cwd=None, timeout=None, enable_run_as=False): + """Executes an adb shell on the device returning stdout. + + :param str cmd: The command to be executed. + :param dict env: Contains the environment variables and their values. + :param str cwd: The directory from which to execute. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per + adb call. The total time spent may exceed this + value. If it is not specified, the value set + in the ADBDevice constructor is used. + :param bool enable_run_as: Flag used to temporarily enable use + of run-as to execute the command. + :return: str - content of stdout. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + adb_process = None + try: + adb_process = self.shell( + cmd, env=env, cwd=cwd, timeout=timeout, enable_run_as=enable_run_as + ) + if adb_process.timedout: + raise ADBTimeoutError("%s" % adb_process) + if adb_process.exitcode: + raise ADBProcessError(adb_process) + output = adb_process.stdout + if self._verbose: + self._logger.debug( + "shell_output: %s, " + "timeout: %s, " + "timedout: %s, " + "exitcode: %s, " + "output: %s" + % ( + " ".join(adb_process.args), + timeout, + adb_process.timedout, + adb_process.exitcode, + output, + ) + ) + + return output + finally: + if adb_process and isinstance(adb_process.stdout_file, io.IOBase): + adb_process.stdout_file.close() + + # Informational methods + + def _get_logcat_buffer_args(self, buffers): + valid_buffers = set(["radio", "main", "events"]) + invalid_buffers = set(buffers).difference(valid_buffers) + if invalid_buffers: + raise ADBError( + "Invalid logcat buffers %s not in %s " + % (list(invalid_buffers), list(valid_buffers)) + ) + args = [] + for b in buffers: + args.extend(["-b", b]) + return args + + def clear_logcat(self, timeout=None, buffers=[]): + """Clears logcat via adb logcat -c. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per + adb call. The total time spent may exceed this + value. If it is not specified, the value set + in the ADBDevice constructor is used. + :param list buffers: Log buffers to clear. Valid buffers are + "radio", "events", and "main". Defaults to "main". + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + buffers = self._get_logcat_buffer_args(buffers) + cmds = ["logcat", "-c"] + buffers + try: + self.command_output(cmds, timeout=timeout) + self.shell_output("log logcat cleared", timeout=timeout) + except ADBTimeoutError: + raise + except ADBProcessError as e: + if "failed to clear" not in str(e): + raise + self._logger.warning( + "retryable logcat clear error?: {}. Retrying...".format(str(e)) + ) + try: + self.command_output(cmds, timeout=timeout) + self.shell_output("log logcat cleared", timeout=timeout) + except ADBProcessError as e2: + if "failed to clear" not in str(e): + raise + self._logger.warning( + "Ignoring failure to clear logcat: {}.".format(str(e2)) + ) + + def get_logcat( + self, + filter_specs=[ + "dalvikvm:I", + "ConnectivityService:S", + "WifiMonitor:S", + "WifiStateTracker:S", + "wpa_supplicant:S", + "NetworkStateTracker:S", + "EmulatedCamera_Camera:S", + "EmulatedCamera_Device:S", + "EmulatedCamera_FakeCamera:S", + "EmulatedCamera_FakeDevice:S", + "EmulatedCamera_CallbackNotifier:S", + "GnssLocationProvider:S", + "Hyphenator:S", + "BatteryStats:S", + ], + format="time", + filter_out_regexps=[], + timeout=None, + buffers=[], + ): + """Returns the contents of the logcat file as a list of strings. + + :param list filter_specs: Optional logcat messages to + be included. + :param str format: Optional logcat format. + :param list filter_out_regexps: Optional logcat messages to be + excluded. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :param list buffers: Log buffers to retrieve. Valid buffers are + "radio", "events", and "main". Defaults to "main". + :return: list of lines logcat output. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + buffers = self._get_logcat_buffer_args(buffers) + cmds = ["logcat", "-v", format, "-d"] + buffers + filter_specs + lines = self.command_output(cmds, timeout=timeout).splitlines() + + for regex in filter_out_regexps: + lines = [line for line in lines if not re.search(regex, line)] + + return lines + + def get_prop(self, prop, timeout=None): + """Gets value of a property from the device via adb shell getprop. + + :param str prop: The propery name. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: str value of property. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + output = self.shell_output("getprop %s" % prop, timeout=timeout) + return output + + def get_state(self, timeout=None): + """Returns the device's state via adb get-state. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: str value of adb get-state. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + output = self.command_output(["get-state"], timeout=timeout).strip() + return output + + def get_ip_address(self, interfaces=None, timeout=None): + """Returns the device's ip address, or None if it doesn't have one + + :param list interfaces: Interfaces to allow, or None to allow any + non-loopback interface. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: str ip address of the device or None if it could not + be found. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if not self.is_rooted: + self._logger.warning("Device not rooted. Can not obtain ip address.") + return None + self._logger.debug("get_ip_address: interfaces: %s" % interfaces) + if not interfaces: + interfaces = ["wlan0", "eth0"] + wifi_interface = self.get_prop("wifi.interface", timeout=timeout) + self._logger.debug("get_ip_address: wifi_interface: %s" % wifi_interface) + if wifi_interface and wifi_interface not in interfaces: + interfaces = interfaces.append(wifi_interface) + + # ifconfig interface + # can return two different formats: + # eth0: ip 192.168.1.139 mask 255.255.255.0 flags [up broadcast running multicast] + # or + # wlan0 Link encap:Ethernet HWaddr 00:9A:CD:B8:39:65 + # inet addr:192.168.1.38 Bcast:192.168.1.255 Mask:255.255.255.0 + # inet6 addr: fe80::29a:cdff:feb8:3965/64 Scope: Link + # UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1 + # RX packets:180 errors:0 dropped:0 overruns:0 frame:0 + # TX packets:218 errors:0 dropped:0 overruns:0 carrier:0 + # collisions:0 txqueuelen:1000 + # RX bytes:84577 TX bytes:31202 + + re1_ip = re.compile(r"(\w+): ip ([0-9.]+) mask.*") + # re1_ip will match output of the first format + # with group 1 returning the interface and group 2 returing the ip address. + + # re2_interface will match the interface line in the second format + # while re2_ip will match the inet addr line of the second format. + re2_interface = re.compile(r"(\w+)\s+Link") + re2_ip = re.compile(r"\s+inet addr:([0-9.]+)") + + matched_interface = None + matched_ip = None + re_bad_addr = re.compile(r"127.0.0.1|0.0.0.0") + + self._logger.debug("get_ip_address: ifconfig") + for interface in interfaces: + try: + output = self.shell_output("ifconfig %s" % interface, timeout=timeout) + except ADBError as e: + self._logger.warning( + "get_ip_address ifconfig %s: %s" % (interface, str(e)) + ) + output = "" + + for line in output.splitlines(): + if not matched_interface: + match = re1_ip.match(line) + if match: + matched_interface, matched_ip = match.groups() + else: + match = re2_interface.match(line) + if match: + matched_interface = match.group(1) + else: + match = re2_ip.match(line) + if match: + matched_ip = match.group(1) + + if matched_ip: + if not re_bad_addr.match(matched_ip): + self._logger.debug( + "get_ip_address: found: %s %s" + % (matched_interface, matched_ip) + ) + return matched_ip + matched_interface = None + matched_ip = None + + self._logger.debug("get_ip_address: netcfg") + # Fall back on netcfg if ifconfig does not work. + # $ adb shell netcfg + # lo UP 127.0.0.1/8 0x00000049 00:00:00:00:00:00 + # dummy0 DOWN 0.0.0.0/0 0x00000082 8e:cd:67:48:b7:c2 + # rmnet0 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet1 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet2 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet3 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet4 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet5 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet6 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # rmnet7 DOWN 0.0.0.0/0 0x00000000 00:00:00:00:00:00 + # sit0 DOWN 0.0.0.0/0 0x00000080 00:00:00:00:00:00 + # vip0 DOWN 0.0.0.0/0 0x00001012 00:01:00:00:00:01 + # wlan0 UP 192.168.1.157/24 0x00001043 38:aa:3c:1c:f6:94 + + re3_netcfg = re.compile( + r"(\w+)\s+UP\s+([1-9]\d{0,2}\.\d{1,3}\.\d{1,3}\.\d{1,3})" + ) + try: + output = self.shell_output("netcfg", timeout=timeout) + except ADBError as e: + self._logger.warning("get_ip_address netcfg: %s" % str(e)) + output = "" + for line in output.splitlines(): + match = re3_netcfg.search(line) + if match: + matched_interface, matched_ip = match.groups() + if matched_interface == "lo" or re_bad_addr.match(matched_ip): + matched_interface = None + matched_ip = None + elif matched_ip and matched_interface in interfaces: + self._logger.debug( + "get_ip_address: found: %s %s" % (matched_interface, matched_ip) + ) + return matched_ip + self._logger.debug("get_ip_address: not found") + return matched_ip + + # File management methods + + def remount(self, timeout=None): + """Remount /system/ in read/write mode + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + + rv = self.command_output(["remount"], timeout=timeout) + if "remount succeeded" not in rv: + raise ADBError("Unable to remount device") + + def batch_execute(self, commands, timeout=None, enable_run_as=False): + """Writes commands to a temporary file then executes on the device. + + :param list commands_list: List of commands to be run by the shell. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :param bool enable_run_as: Flag used to temporarily enable use + of run-as to execute the command. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + try: + tmpf = tempfile.NamedTemporaryFile(mode="w", delete=False) + tmpf.write("\n".join(commands)) + tmpf.close() + script = "/sdcard/{}".format(os.path.basename(tmpf.name)) + self.push(tmpf.name, script) + self.shell_output( + "sh {}".format(script), enable_run_as=enable_run_as, timeout=timeout + ) + finally: + if tmpf: + os.unlink(tmpf.name) + if script: + self.rm(script, timeout=timeout) + + def chmod(self, path, recursive=False, mask="777", timeout=None): + """Recursively changes the permissions of a directory on the + device. + + :param str path: The directory name on the device. + :param bool recursive: Flag specifying if the command should be + executed recursively. + :param str mask: The octal permissions. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + # Note that on some tests such as webappstartup, an error + # occurs during recursive calls to chmod where a "No such file + # or directory" error will occur for the + # /data/data/org.mozilla.fennec/files/mozilla/*.webapp0/lock + # which is a symbolic link to a socket: lock -> + # 127.0.0.1:+<port>. On Linux, chmod -R ignores symbolic + # links but it appear Android's version does not. We ignore + # this type of error, but pass on any other errors that are + # detected. + path = posixpath.normpath(path.strip()) + enable_run_as = self.enable_run_as_for_path(path) + self._logger.debug( + "chmod: path=%s, recursive=%s, mask=%s" % (path, recursive, mask) + ) + if self.is_path_internal_storage(path, timeout=timeout): + # External storage on Android is case-insensitive and permissionless + # therefore even with the proper privileges it is not possible + # to change modes. + self._logger.debug("Ignoring attempt to chmod external storage") + return + + # build up the command to be run based on capabilities. + command = ["chmod"] + + if recursive and self._chmod_R: + command.append("-R") + + command.append(mask) + + if recursive and not self._chmod_R: + paths = self.ls(path, recursive=True, timeout=timeout) + base = " ".join(command) + commands = [" ".join([base, entry]) for entry in paths] + self.batch_execute(commands, timeout=timeout, enable_run_as=enable_run_as) + else: + command.append(path) + try: + self.shell_output( + cmd=" ".join(command), timeout=timeout, enable_run_as=enable_run_as + ) + except ADBProcessError as e: + if "No such file or directory" not in str(e): + # It appears that chmod -R with symbolic links will exit with + # exit code 1 but the files apart from the symbolic links + # were transfered. + raise + + def chown(self, path, owner, group=None, recursive=False, timeout=None): + """Run the chown command on the provided path. + + :param str path: path name on the device. + :param str owner: new owner of the path. + :param str group: optional parameter specifying the new group the path + should belong to. + :param bool recursive: optional value specifying whether the command should + operate on files and directories recursively. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before throwing + an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + path = posixpath.normpath(path.strip()) + enable_run_as = self.enable_run_as_for_path(path) + if self.is_path_internal_storage(path, timeout=timeout): + self._logger.warning("Ignoring attempt to chown external storage") + return + + # build up the command to be run based on capabilities. + command = ["chown"] + + if recursive and self._chown_R: + command.append("-R") + + if group: + # officially supported notation is : but . has been checked with + # sdk 17 and it works. + command.append("{owner}.{group}".format(owner=owner, group=group)) + else: + command.append(owner) + + if recursive and not self._chown_R: + # recursive desired, but chown -R is not supported natively. + # like with chmod, get the list of subpaths, put them into a script + # then run it with adb with one call. + paths = self.ls(path, recursive=True, timeout=timeout) + base = " ".join(command) + commands = [" ".join([base, entry]) for entry in paths] + + self.batch_execute(commands, timeout=timeout, enable_run_as=enable_run_as) + else: + # recursive or not, and chown -R is supported natively. + # command can simply be run as provided by the user. + command.append(path) + self.shell_output( + cmd=" ".join(command), timeout=timeout, enable_run_as=enable_run_as + ) + + def _test_path(self, argument, path, timeout=None): + """Performs path and file type checking. + + :param str argument: Command line argument to the test command. + :param str path: The path or filename on the device. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :param bool root: Flag specifying if the command should be + executed as root. + :return: boolean - True if path or filename fulfills the + condition of the test. + :raises: :exc:`ADBTimeoutError` + """ + enable_run_as = self.enable_run_as_for_path(path) + if not enable_run_as and not self._device_serial.startswith("emulator"): + return self.shell_bool( + "test -{arg} {path}".format(arg=argument, path=path), + timeout=timeout, + enable_run_as=False, + ) + # Bug 1572563 - work around intermittent test path failures on emulators. + # The shell built-in test is not supported via run-as. + if argument == "f": + return self.exists(path, timeout=timeout) and not self.is_dir( + path, timeout=timeout + ) + if argument == "d": + return self.shell_bool( + "ls -a {}/".format(path), timeout=timeout, enable_run_as=enable_run_as + ) + if argument == "e": + return self.shell_bool( + "ls -a {}".format(path), timeout=timeout, enable_run_as=enable_run_as + ) + raise ADBError("_test_path: Unknown argument %s" % argument) + + def exists(self, path, timeout=None): + """Returns True if the path exists on the device. + + :param str path: The path name on the device. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :param bool root: Flag specifying if the command should be + executed as root. + :return: boolean - True if path exists. + :raises: :exc:`ADBTimeoutError` + """ + path = posixpath.normpath(path) + return self._test_path("e", path, timeout=timeout) + + def is_dir(self, path, timeout=None): + """Returns True if path is an existing directory on the device. + + :param str path: The directory on the device. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: boolean - True if path exists on the device and is a + directory. + :raises: :exc:`ADBTimeoutError` + """ + path = posixpath.normpath(path) + return self._test_path("d", path, timeout=timeout) + + def is_file(self, path, timeout=None): + """Returns True if path is an existing file on the device. + + :param str path: The file name on the device. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: boolean - True if path exists on the device and is a + file. + :raises: :exc:`ADBTimeoutError` + """ + path = posixpath.normpath(path) + return self._test_path("f", path, timeout=timeout) + + def list_files(self, path, timeout=None): + """Return a list of files/directories contained in a directory + on the device. + + :param str path: The directory name on the device. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: list of files/directories contained in the directory. + :raises: :exc:`ADBTimeoutError` + """ + path = posixpath.normpath(path.strip()) + enable_run_as = self.enable_run_as_for_path(path) + data = [] + if self.is_dir(path, timeout=timeout): + try: + data = self.shell_output( + "%s %s" % (self._ls, path), + timeout=timeout, + enable_run_as=enable_run_as, + ).splitlines() + self._logger.debug("list_files: data: %s" % data) + except ADBError: + self._logger.error( + "Ignoring exception in ADBDevice.list_files\n%s" + % traceback.format_exc() + ) + data[:] = [item for item in data if item] + self._logger.debug("list_files: %s" % data) + return data + + def ls(self, path, recursive=False, timeout=None): + """Return a list of matching files/directories on the device. + + The ls method emulates the behavior of the ls shell command. + It differs from the list_files method by supporting wild cards + and returning matches even if the path is not a directory and + by allowing a recursive listing. + + ls /sdcard always returns /sdcard and not the contents of the + sdcard path. The ls method makes the behavior consistent with + others paths by adjusting /sdcard to /sdcard/. Note this is + also the case of other sdcard related paths such as + /storage/emulated/legacy but no adjustment is made in those + cases. + + The ls method works around a Nexus 4 bug which prevents + recursive listing of directories on the sdcard unless the path + ends with "/*" by adjusting sdcard paths ending in "/" to end + with "/*". This adjustment is only made on official Nexus 4 + builds with property ro.product.model "Nexus 4". Note that + this will fail to return any "hidden" files or directories + which begin with ".". + + :param str path: The directory name on the device. + :param bool recursive: Flag specifying if a recursive listing + is to be returned. If recursive is False, the returned + matches will be relative to the path. If recursive is True, + the returned matches will be absolute paths. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: list of files/directories contained in the directory. + :raises: :exc:`ADBTimeoutError` + """ + path = posixpath.normpath(path.strip()) + enable_run_as = self.enable_run_as_for_path(path) + parent = "" + entries = {} + + if path == "/sdcard": + path += "/" + + # Android 2.3 and later all appear to support ls -R however + # Nexus 4 does not perform a recursive search on the sdcard + # unless the path is a directory with * wild card. + if not recursive: + recursive_flag = "" + else: + recursive_flag = "-R" + if path.startswith("/sdcard") and path.endswith("/"): + model = self.get_prop("ro.product.model", timeout=timeout) + if model == "Nexus 4": + path += "*" + lines = self.shell_output( + "%s %s %s" % (self._ls, recursive_flag, path), + timeout=timeout, + enable_run_as=enable_run_as, + ).splitlines() + for line in lines: + line = line.strip() + if not line: + parent = "" + continue + if line.endswith(":"): # This is a directory + parent = line.replace(":", "/") + entry = parent + # Remove earlier entry which is marked as a file. + if parent[:-1] in entries: + del entries[parent[:-1]] + elif parent: + entry = "%s%s" % (parent, line) + else: + entry = line + entries[entry] = 1 + entry_list = list(entries.keys()) + entry_list.sort() + return entry_list + + def mkdir(self, path, parents=False, timeout=None): + """Create a directory on the device. + + :param str path: The directory name on the device + to be created. + :param bool parents: Flag indicating if the parent directories are + also to be created. Think mkdir -p path. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + + def verify_mkdir(path): + # Verify that the directory was actually created. On some devices + # (x86_64 emulator, v 29.0.11) the directory is sometimes not + # immediately visible, so retries are allowed. + retry = 0 + while retry < 10: + if self.is_dir(path, timeout=timeout): + return True + time.sleep(1) + retry += 1 + return False + + self._sync(timeout=timeout) + + path = posixpath.normpath(path) + enable_run_as = self.enable_run_as_for_path(path) + if parents: + if self._mkdir_p is None or self._mkdir_p: + # Use shell_bool to catch the possible + # non-zero exitcode if -p is not supported. + if self.shell_bool( + "mkdir -p %s" % path, timeout=timeout, enable_run_as=enable_run_as + ) or verify_mkdir(path): + self.chmod(path, recursive=True, timeout=timeout) + self._mkdir_p = True + self._sync(timeout=timeout) + return + # mkdir -p is not supported. create the parent + # directories individually. + if not self.is_dir(posixpath.dirname(path)): + parts = path.split("/") + name = "/" + for part in parts[:-1]: + if part != "": + name = posixpath.join(name, part) + if not self.is_dir(name): + # Use shell_output to allow any non-zero + # exitcode to raise an ADBError. + self.shell_output( + "mkdir %s" % name, + timeout=timeout, + enable_run_as=enable_run_as, + ) + self.chmod(name, recursive=True, timeout=timeout) + self._sync(timeout=timeout) + + # If parents is True and the directory does exist, we don't + # need to do anything. Otherwise we call mkdir. If the + # directory already exists or if it is a file instead of a + # directory, mkdir will fail and we will raise an ADBError. + if not parents or not self.is_dir(path): + self.shell_output( + "mkdir %s" % path, timeout=timeout, enable_run_as=enable_run_as + ) + self._sync(timeout=timeout) + self.chmod(path, recursive=True, timeout=timeout) + if not verify_mkdir(path): + raise ADBError("mkdir %s Failed" % path) + + def push(self, local, remote, timeout=None): + """Pushes a file or directory to the device. + + :param str local: The name of the local file or + directory name. + :param str remote: The name of the remote file or + directory name. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + self._sync(timeout=timeout) + + # remove trailing / + local = os.path.normpath(local) + remote = posixpath.normpath(remote) + copy_required = False + sdcard_remote = None + if os.path.isfile(local) and self.is_dir(remote): + # force push to use the correct filename in the remote directory + remote = posixpath.join(remote, os.path.basename(local)) + elif os.path.isdir(local): + copy_required = True + temp_parent = tempfile.mkdtemp() + remote_name = os.path.basename(remote) + new_local = os.path.join(temp_parent, remote_name) + dir_util.copy_tree(local, new_local) + local = new_local + # See do_sync_push in + # https://android.googlesource.com/platform/system/core/+/master/adb/file_sync_client.cpp + # Work around change in behavior in adb 1.0.36 where if + # the remote destination directory exists, adb push will + # copy the source directory *into* the destination + # directory otherwise it will copy the source directory + # *onto* the destination directory. + if self.is_dir(remote): + remote = "/".join(remote.rstrip("/").split("/")[:-1]) + try: + if not self._run_as_package: + self.command_output(["push", local, remote], timeout=timeout) + self.chmod(remote, recursive=True, timeout=timeout) + else: + # When using run-as to work around the lack of root on a + # device, we can not push directly to the app's + # internal storage since the shell user under which + # the push runs does not have permission to write to + # the app's directory. Instead, we use a two stage + # operation where we first push to a temporary + # intermediate location under /data/local/tmp which + # should be writable by the shell user, then using + # run-as, copy the data into the app's internal + # storage. + try: + with tempfile.NamedTemporaryFile(delete=True) as tmpf: + intermediate = posixpath.join( + "/data/local/tmp", os.path.basename(tmpf.name) + ) + self.command_output(["push", local, intermediate], timeout=timeout) + self.chmod(intermediate, recursive=True, timeout=timeout) + parent_dir = posixpath.dirname(remote) + if not self.is_dir(parent_dir, timeout=timeout): + self.mkdir(parent_dir, parents=True, timeout=timeout) + self.cp(intermediate, remote, recursive=True, timeout=timeout) + finally: + self.rm(intermediate, recursive=True, force=True, timeout=timeout) + except ADBProcessError as e: + if "remote secure_mkdirs failed" not in str(e): + raise + self._logger.warning( + "remote secure_mkdirs failed push('{}', '{}') {}".format( + local, remote, str(e) + ) + ) + # Work around change in Android where push creates + # directories which can not be written by "other" by first + # pushing the source to the sdcard which has no + # permissions issues, then moving it from the sdcard to + # the final destination. + self._logger.info("Falling back to using intermediate /sdcard in push.") + self.mkdir(posixpath.dirname(remote), parents=True, timeout=timeout) + with tempfile.NamedTemporaryFile(delete=True) as tmpf: + sdcard_remote = posixpath.join("/sdcard", os.path.basename(tmpf.name)) + self.command_output(["push", local, sdcard_remote], timeout=timeout) + self.cp(sdcard_remote, remote, recursive=True, timeout=timeout) + except BaseException: + raise + finally: + self._sync(timeout=timeout) + if copy_required: + shutil.rmtree(temp_parent) + if sdcard_remote: + self.rm(sdcard_remote, recursive=True, force=True, timeout=timeout) + + def pull(self, remote, local, timeout=None): + """Pulls a file or directory from the device. + + :param str remote: The path of the remote file or + directory. + :param str local: The path of the local file or + directory name. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + self._sync(timeout=timeout) + + # remove trailing / + local = os.path.normpath(local) + remote = posixpath.normpath(remote) + copy_required = False + original_local = local + if os.path.isdir(local) and self.is_dir(remote): + # See do_sync_pull in + # https://android.googlesource.com/platform/system/core/+/master/adb/file_sync_client.cpp + # Work around change in behavior in adb 1.0.36 where if + # the local destination directory exists, adb pull will + # copy the source directory *into* the destination + # directory otherwise it will copy the source directory + # *onto* the destination directory. + # + # If the destination directory does exist, pull to its + # parent directory. If the source and destination leaf + # directory names are different, pull the source directory + # into a temporary directory and then copy the temporary + # directory onto the destination. + local_name = os.path.basename(local) + remote_name = os.path.basename(remote) + if local_name != remote_name: + copy_required = True + temp_parent = tempfile.mkdtemp() + local = os.path.join(temp_parent, remote_name) + else: + local = "/".join(local.rstrip("/").split("/")[:-1]) + try: + if not self._run_as_package: + # We must first make the remote directory readable. + self.chmod(remote, recursive=True, timeout=timeout) + self.command_output(["pull", remote, local], timeout=timeout) + else: + # When using run-as to work around the lack of root on + # a device, we can not pull directly from the apps + # internal storage since the shell user under which + # the pull runs does not have permission to read from + # the app's directory. Instead, we use a two stage + # operation where we first use run-as to copy the data + # from the app's internal storage to a temporary + # intermediate location under /data/local/tmp which + # should be writable by the shell user, then using + # pull, to copy the data off of the device. + try: + with tempfile.NamedTemporaryFile(delete=True) as tmpf: + intermediate = posixpath.join( + "/data/local/tmp", os.path.basename(tmpf.name) + ) + # When using run-as <app>, we must first use the + # shell to create the intermediate and chmod it + # before the app will be able to access it. + if self.is_dir(remote, timeout=timeout): + self.mkdir( + posixpath.join(intermediate, remote_name), + parents=True, + timeout=timeout, + ) + else: + self.shell_output("echo > %s" % intermediate, timeout=timeout) + self.chmod(intermediate, timeout=timeout) + self.cp(remote, intermediate, recursive=True, timeout=timeout) + self.command_output(["pull", intermediate, local], timeout=timeout) + except ADBError as e: + self._logger.error("pull %s %s: %s" % (intermediate, local, str(e))) + finally: + self.rm(intermediate, recursive=True, force=True, timeout=timeout) + finally: + if copy_required: + dir_util.copy_tree(local, original_local) + shutil.rmtree(temp_parent) + + def get_file(self, remote, offset=None, length=None, timeout=None): + """Pull file from device and return the file's content + + :param str remote: The path of the remote file. + :param offset: If specified, return only content beyond this offset. + :param length: If specified, limit content length accordingly. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + self._sync(timeout=timeout) + + with tempfile.NamedTemporaryFile() as tf: + self.pull(remote, tf.name, timeout=timeout) + with io.open(tf.name, mode="rb") as tf2: + # ADB pull does not support offset and length, but we can + # instead read only the requested portion of the local file + if offset is not None and length is not None: + tf2.seek(offset) + return tf2.read(length) + if offset is not None: + tf2.seek(offset) + return tf2.read() + return tf2.read() + + def rm(self, path, recursive=False, force=False, timeout=None): + """Delete files or directories on the device. + + :param str path: The path of the remote file or directory. + :param bool recursive: Flag specifying if the command is + to be applied recursively to the target. Default is False. + :param bool force: Flag which if True will not raise an + error when attempting to delete a non-existent file. Default + is False. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + path = posixpath.normpath(path) + enable_run_as = self.enable_run_as_for_path(path) + self._sync(timeout=timeout) + + cmd = "rm" + if recursive: + cmd += " -r" + try: + self.shell_output( + "%s %s" % (cmd, path), timeout=timeout, enable_run_as=enable_run_as + ) + self._sync(timeout=timeout) + if self.exists(path, timeout=timeout): + raise ADBError('rm("%s") failed to remove path.' % path) + except ADBError as e: + if not force and "No such file or directory" in str(e): + raise + if "Directory not empty" in str(e): + raise + if self._verbose and "No such file or directory" not in str(e): + self._logger.error( + "rm %s recursive=%s force=%s timeout=%s enable_run_as=%s: %s" + % (path, recursive, force, timeout, enable_run_as, str(e)) + ) + + def rmdir(self, path, timeout=None): + """Delete empty directory on the device. + + :param str path: The directory name on the device. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + path = posixpath.normpath(path) + enable_run_as = self.enable_run_as_for_path(path) + self.shell_output( + "rmdir %s" % path, timeout=timeout, enable_run_as=enable_run_as + ) + self._sync(timeout=timeout) + if self.is_dir(path, timeout=timeout): + raise ADBError('rmdir("%s") failed to remove directory.' % path) + + # Process management methods + + def get_process_list(self, timeout=None): + """Returns list of tuples (pid, name, user) for running + processes on device. + + :param int timeout: The maximum time + in seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, + the value set in the ADBDevice constructor is used. + :return: list of (pid, name, user) tuples for running processes + on the device. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + adb_process = None + max_attempts = 2 + try: + for attempt in range(1, max_attempts + 1): + adb_process = self.shell("ps", timeout=timeout) + if adb_process.timedout: + raise ADBTimeoutError("%s" % adb_process) + if adb_process.exitcode: + raise ADBProcessError(adb_process) + # first line is the headers + header = six.ensure_str(adb_process.stdout_file.readline()) + pid_i = -1 + user_i = -1 + els = header.split() + for i in range(len(els)): + item = els[i].lower() + if item == "user": + user_i = i + elif item == "pid": + pid_i = i + if user_i != -1 and pid_i != -1: + break + # if this isn't the final attempt, don't print this as an error + if attempt < max_attempts: + self._logger.info( + "get_process_list: attempt: %d %s" % (attempt, header) + ) + else: + raise ADBError( + "get_process_list: Unknown format: %s: %s" + % (header, adb_process) + ) + ret = [] + line = six.ensure_str(adb_process.stdout_file.readline()) + while line: + els = line.split() + try: + ret.append([int(els[pid_i]), els[-1], els[user_i]]) + except ValueError: + self._logger.error( + "get_process_list: %s %s\n%s" + % (header, line, traceback.format_exc()) + ) + raise ADBError( + "get_process_list: %s: %s: %s" % (header, line, adb_process) + ) + except IndexError: + self._logger.error( + "get_process_list: %s %s els %s pid_i %s user_i %s\n%s" + % (header, line, els, pid_i, user_i, traceback.format_exc()) + ) + raise ADBError( + "get_process_list: %s: %s els %s pid_i %s user_i %s: %s" + % (header, line, els, pid_i, user_i, adb_process) + ) + line = six.ensure_str(adb_process.stdout_file.readline()) + self._logger.debug("get_process_list: %s" % ret) + return ret + finally: + if adb_process and isinstance(adb_process.stdout_file, io.IOBase): + adb_process.stdout_file.close() + + def kill(self, pids, sig=None, attempts=3, wait=5, timeout=None): + """Kills processes on the device given a list of process ids. + + :param list pids: process ids to be killed. + :param int sig: signal to be sent to the process. + :param integer attempts: number of attempts to try to + kill the processes. + :param integer wait: number of seconds to wait after each attempt. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + pid_list = [str(pid) for pid in pids] + for attempt in range(attempts): + args = ["kill"] + if sig: + args.append("-%d" % sig) + args.extend(pid_list) + try: + self.shell_output(" ".join(args), timeout=timeout) + except ADBError as e: + if "No such process" not in str(e): + raise + pid_set = set(pid_list) + current_pid_set = set( + [str(proc[0]) for proc in self.get_process_list(timeout=timeout)] + ) + pid_list = list(pid_set.intersection(current_pid_set)) + if not pid_list: + break + self._logger.debug( + "Attempt %d of %d to kill processes %s failed" + % (attempt + 1, attempts, pid_list) + ) + time.sleep(wait) + + if pid_list: + raise ADBError("kill: processes %s not killed" % pid_list) + + def pkill(self, appname, sig=None, attempts=3, wait=5, timeout=None): + """Kills a processes on the device matching a name. + + :param str appname: The app name of the process to + be killed. Note that only the first 75 characters of the + process name are significant. + :param int sig: optional signal to be sent to the process. + :param integer attempts: number of attempts to try to + kill the processes. + :param integer wait: number of seconds to wait after each attempt. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :param bool root: Flag specifying if the command should + be executed as root. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + pids = self.pidof(appname, timeout=timeout) + + if not pids: + return + + try: + self.kill(pids, sig, attempts=attempts, wait=wait, timeout=timeout) + except ADBError as e: + if self.process_exist(appname, timeout=timeout): + raise e + + def process_exist(self, process_name, timeout=None): + """Returns True if process with name process_name is running on + device. + + :param str process_name: The name of the process + to check. Note that only the first 75 characters of the + process name are significant. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: boolean - True if process exists. + + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if not isinstance(process_name, six.string_types): + raise ADBError("Process name %s is not a string" % process_name) + + # Filter out extra spaces. + parts = [x for x in process_name.split(" ") if x != ""] + process_name = " ".join(parts) + + # Filter out the quoted env string if it exists + # ex: '"name=value;name2=value2;etc=..." process args' -> 'process args' + parts = process_name.split('"') + if len(parts) > 2: + process_name = " ".join(parts[2:]).strip() + + pieces = process_name.split(" ") + parts = pieces[0].split("/") + app = parts[-1] + + if self.pidof(app, timeout=timeout): + return True + return False + + def cp(self, source, destination, recursive=False, timeout=None): + """Copies a file or directory on the device. + + :param source: string containing the path of the source file or + directory. + :param destination: string containing the path of the destination file + or directory. + :param recursive: optional boolean indicating if a recursive copy is to + be performed. Required if the source is a directory. Defaults to + False. Think cp -R source destination. + :param int timeout: optional integer specifying the maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + source = posixpath.normpath(source) + destination = posixpath.normpath(destination) + enable_run_as = self.enable_run_as_for_path( + source + ) or self.enable_run_as_for_path(destination) + if self._have_cp: + r = "-R" if recursive else "" + self.shell_output( + "cp %s %s %s" % (r, source, destination), + timeout=timeout, + enable_run_as=enable_run_as, + ) + self.chmod(destination, recursive=recursive, timeout=timeout) + self._sync(timeout=timeout) + return + + # Emulate cp behavior depending on if source and destination + # already exists and whether they are a directory or file. + if not self.exists(source, timeout=timeout): + raise ADBError("cp: can't stat '%s': No such file or directory" % source) + + if self.is_file(source, timeout=timeout): + if self.is_dir(destination, timeout=timeout): + # Copy the source file into the destination directory + destination = posixpath.join(destination, os.path.basename(source)) + self.shell_output("dd if=%s of=%s" % (source, destination), timeout=timeout) + self.chmod(destination, recursive=recursive, timeout=timeout) + self._sync(timeout=timeout) + return + + if self.is_file(destination, timeout=timeout): + raise ADBError("cp: %s: Not a directory" % destination) + + if not recursive: + raise ADBError("cp: omitting directory '%s'" % source) + + if self.is_dir(destination, timeout=timeout): + # Copy the source directory into the destination directory. + destination_dir = posixpath.join(destination, os.path.basename(source)) + else: + # Copy the contents of the source directory into the + # destination directory. + destination_dir = destination + + try: + # Do not create parent directories since cp does not. + self.mkdir(destination_dir, timeout=timeout) + except ADBError as e: + if "File exists" not in str(e): + raise + + for i in self.list_files(source, timeout=timeout): + self.cp( + posixpath.join(source, i), + posixpath.join(destination_dir, i), + recursive=recursive, + timeout=timeout, + ) + self.chmod(destination_dir, recursive=True, timeout=timeout) + + def mv(self, source, destination, timeout=None): + """Moves a file or directory on the device. + + :param source: string containing the path of the source file or + directory. + :param destination: string containing the path of the destination file + or directory. + :param int timeout: optional integer specifying the maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + source = posixpath.normpath(source) + destination = posixpath.normpath(destination) + enable_run_as = self.enable_run_as_for_path( + source + ) or self.enable_run_as_for_path(destination) + self.shell_output( + "mv %s %s" % (source, destination), + timeout=timeout, + enable_run_as=enable_run_as, + ) + + def reboot(self, timeout=None): + """Reboots the device. + + :param int timeout: optional integer specifying the maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + + reboot() reboots the device, issues an adb wait-for-device in order to + wait for the device to complete rebooting, then calls is_device_ready() + to determine if the device has completed booting. + + If the device supports running adbd as root, adbd will be + restarted running as root. Then, if the device supports + SELinux, setenforce Permissive will be called to change + SELinux to permissive. This must be done after adbd is + restarted in order for the SELinux Permissive setting to + persist. + + """ + self.command_output(["reboot"], timeout=timeout) + self._wait_for_boot_completed(timeout=timeout) + return self.is_device_ready(timeout=timeout) + + def get_sysinfo(self, timeout=None): + """ + Returns a detailed dictionary of information strings about the device. + + :param int timeout: optional integer specifying the maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + + :raises: :exc:`ADBTimeoutError` + """ + results = {"info": self.get_info(timeout=timeout)} + for service in ( + "meminfo", + "cpuinfo", + "dbinfo", + "procstats", + "usagestats", + "battery", + "batterystats", + "diskstats", + ): + results[service] = self.shell_output( + "dumpsys %s" % service, timeout=timeout + ) + return results + + def get_info(self, directive=None, timeout=None): + """ + Returns a dictionary of information strings about the device. + + :param directive: information you want to get. Options are: + - `battery` - battery charge as a percentage + - `disk` - total, free, available bytes on disk + - `id` - unique id of the device + - `os` - name of the os + - `process` - list of running processes (same as ps) + - `systime` - system time of the device + - `uptime` - uptime of the device + + If `directive` is `None`, will return all available information + :param int timeout: optional integer specifying the maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + directives = ["battery", "disk", "id", "os", "process", "systime", "uptime"] + + if directive in directives: + directives = [directive] + + info = {} + if "battery" in directives: + info["battery"] = self.get_battery_percentage(timeout=timeout) + if "disk" in directives: + info["disk"] = self.shell_output( + "df /data /system /sdcard", timeout=timeout + ).splitlines() + if "id" in directives: + info["id"] = self.command_output(["get-serialno"], timeout=timeout) + if "os" in directives: + info["os"] = self.get_prop("ro.build.display.id", timeout=timeout) + if "process" in directives: + ps = self.shell_output("ps", timeout=timeout) + info["process"] = ps.splitlines() + if "systime" in directives: + info["systime"] = self.shell_output("date", timeout=timeout) + if "uptime" in directives: + uptime = self.shell_output("uptime", timeout=timeout) + if uptime: + m = re.match(r"up time: ((\d+) days, )*(\d{2}):(\d{2}):(\d{2})", uptime) + if m: + uptime = "%d days %d hours %d minutes %d seconds" % tuple( + [int(g or 0) for g in m.groups()[1:]] + ) + info["uptime"] = uptime + return info + + # Properties to manage SELinux on the device: + # https://source.android.com/devices/tech/security/selinux/index.html + # setenforce [ Enforcing | Permissive | 1 | 0 ] + # getenforce returns either Enforcing or Permissive + + @property + def selinux(self): + """Returns True if SELinux is supported, False otherwise.""" + if self._selinux is None: + self._selinux = self.enforcing != "" + return self._selinux + + @property + def enforcing(self): + try: + enforce = self.shell_output("getenforce") + except ADBError as e: + enforce = "" + self._logger.warning("Unable to get SELinux enforcing due to %s." % e) + return enforce + + @enforcing.setter + def enforcing(self, value): + """Set SELinux mode. + :param str value: The new SELinux mode. Should be one of + Permissive, 0, Enforcing, 1 but it is not validated. + """ + try: + self.shell_output("setenforce %s" % value) + self._logger.info("Setting SELinux %s" % value) + except ADBError as e: + self._logger.warning("Unable to set SELinux Permissive due to %s." % e) + + # Informational methods + + def get_battery_percentage(self, timeout=None): + """Returns the battery charge as a percentage. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: battery charge as a percentage. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + level = None + scale = None + percentage = 0 + cmd = "dumpsys battery" + re_parameter = re.compile(r"\s+(\w+):\s+(\d+)") + lines = self.shell_output(cmd, timeout=timeout).splitlines() + for line in lines: + match = re_parameter.match(line) + if match: + parameter = match.group(1) + value = match.group(2) + if parameter == "level": + level = float(value) + elif parameter == "scale": + scale = float(value) + if parameter is not None and scale is not None: + # pylint --py3k W1619 + percentage = 100.0 * level / scale + break + return percentage + + def get_top_activity(self, timeout=None): + """Returns the name of the top activity (focused app) reported by dumpsys + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADBDevice constructor is used. + :return: package name of top activity or None (cannot be determined) + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if self.version < version_codes.Q: + return self._get_top_activity_P(timeout=timeout) + return self._get_top_activity_Q(timeout=timeout) + + def _get_top_activity_P(self, timeout=None): + """Returns the name of the top activity (focused app) reported by dumpsys + for Android 9 and earlier. + """ + package = None + data = None + cmd = "dumpsys window windows" + verbose = self._verbose + try: + self._verbose = False + data = self.shell_output(cmd, timeout=timeout) + except Exception as e: + # dumpsys intermittently fails on some platforms. + self._logger.info("_get_top_activity_P: Exception %s: %s" % (cmd, e)) + return package + finally: + self._verbose = verbose + m = re.search("mFocusedApp(.+)/", data) + if not m: + # alternative format seen on newer versions of Android + m = re.search("FocusedApplication(.+)/", data) + if m: + line = m.group(0) + # Extract package name: string of non-whitespace ending in forward slash + m = re.search(r"(\S+)/$", line) + if m: + package = m.group(1) + if self._verbose: + self._logger.debug("get_top_activity: %s" % str(package)) + return package + + def _get_top_activity_Q(self, timeout=None): + """Returns the name of the top activity (focused app) reported by dumpsys + for Android 10 and later. + """ + package = None + data = None + cmd = "dumpsys window" + verbose = self._verbose + try: + self._verbose = False + data = self.shell_output(cmd, timeout=timeout) + except Exception as e: + # dumpsys intermittently fails on some platforms (4.3 arm emulator) + self._logger.info("_get_top_activity_Q: Exception %s: %s" % (cmd, e)) + return package + finally: + self._verbose = verbose + m = re.search(r"mFocusedWindow=Window{\S+ \S+ (\S+)/\S+}", data) + if m: + package = m.group(1) + if self._verbose: + self._logger.debug("get_top_activity: %s" % str(package)) + return package + + # System control methods + + def is_device_ready(self, timeout=None): + """Checks if a device is ready for testing. + + This method uses the android only package manager to check for + readiness. + + :param int timeout: The maximum time + in seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + # command_output automatically inserts a 'wait-for-device' + # argument to adb. Issuing an empty command is the same as adb + # -s <device> wait-for-device. We don't send an explicit + # 'wait-for-device' since that would add duplicate + # 'wait-for-device' arguments which is an error in newer + # versions of adb. + self._wait_for_boot_completed(timeout=timeout) + pm_error_string = "Error: Could not access the Package Manager" + ready_path = os.path.join(self.test_root, "ready") + for attempt in range(self._device_ready_retry_attempts): + failure = "Unknown failure" + success = True + try: + state = self.get_state(timeout=timeout) + if state != "device": + failure = "Device state: %s" % state + success = False + else: + if self.enforcing != "Permissive": + self.enforcing = "Permissive" + if self.is_dir(ready_path, timeout=timeout): + self.rmdir(ready_path, timeout=timeout) + self.mkdir(ready_path, timeout=timeout) + self.rmdir(ready_path, timeout=timeout) + # Invoke the pm list packages command to see if it is up and + # running. + data = self.shell_output( + "pm list packages org.mozilla", timeout=timeout + ) + if pm_error_string in data: + failure = data + success = False + except ADBError as e: + success = False + failure = str(e) + + if not success: + self._logger.debug( + "Attempt %s of %s device not ready: %s" + % (attempt + 1, self._device_ready_retry_attempts, failure) + ) + time.sleep(self._device_ready_retry_wait) + + return success + + def power_on(self, timeout=None): + """Sets the device's power stayon value. + + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + try: + self.shell_output("svc power stayon true", timeout=timeout) + except ADBError as e: + # Executing this via adb shell errors, but not interactively. + # Any other exitcode is a real error. + if "exitcode: 137" not in str(e): + raise + self._logger.warning("Unable to set power stayon true: %s" % e) + + # Application management methods + + def add_change_device_settings(self, app_name, timeout=None): + """ + Allows the test to change Android device settings. + :param str: app_name: Name of application (e.g. `org.mozilla.fennec`) + """ + self.shell_output( + "appops set %s android:write_settings allow" % app_name, + timeout=timeout, + enable_run_as=False, + ) + + def add_mock_location(self, app_name, timeout=None): + """ + Allows the Android device to use mock locations. + :param str: app_name: Name of application (e.g. `org.mozilla.fennec`) + """ + self.shell_output( + "appops set %s android:mock_location allow" % app_name, + timeout=timeout, + enable_run_as=False, + ) + + def grant_runtime_permissions(self, app_name, timeout=None): + """ + Grant required runtime permissions to the specified app + (typically org.mozilla.fennec_$USER). + + :param str: app_name: Name of application (e.g. `org.mozilla.fennec`) + """ + if self.version >= version_codes.M: + permissions = [ + "android.permission.READ_EXTERNAL_STORAGE", + "android.permission.ACCESS_COARSE_LOCATION", + "android.permission.ACCESS_FINE_LOCATION", + "android.permission.CAMERA", + "android.permission.RECORD_AUDIO", + ] + if self.version < version_codes.R: + # WRITE_EXTERNAL_STORAGE is no longer available + # in Android 11+ + permissions.append("android.permission.WRITE_EXTERNAL_STORAGE") + self._logger.info("Granting important runtime permissions to %s" % app_name) + for permission in permissions: + try: + self.shell_output( + "pm grant %s %s" % (app_name, permission), + timeout=timeout, + enable_run_as=False, + ) + except ADBError as e: + self._logger.warning( + "Unable to grant runtime permission %s to %s due to %s" + % (permission, app_name, e) + ) + + def install_app_bundle(self, bundletool, bundle_path, java_home=None, timeout=None): + """Installs an app bundle (AAB) on the device. + + :param str bundletool: Path to the bundletool jar + :param str bundle_path: The aab file name to be installed. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :param str java_home: Path to the JDK location. Will default to + $JAVA_HOME when not specififed. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + device_serial = self._device_serial or os.environ.get("ANDROID_SERIAL") + java_home = java_home or os.environ.get("JAVA_HOME") + with tempfile.TemporaryDirectory() as temporaryDirectory: + # bundletool doesn't come with a debug-key so we need to provide + # one ourselves. + keystore_path = os.path.join(temporaryDirectory, "debug.keystore") + keytool_path = os.path.join(java_home, "bin", "keytool") + key_gen = [ + keytool_path, + "-genkey", + "-v", + "-keystore", + keystore_path, + "-alias", + "androiddebugkey", + "-storepass", + "android", + "-keypass", + "android", + "-keyalg", + "RSA", + "-validity", + "14000", + "-dname", + "cn=Unknown, ou=Unknown, o=Unknown, c=Unknown", + ] + self._logger.info("key_gen: %s" % key_gen) + try: + subprocess.check_call(key_gen, timeout=timeout) + except subprocess.TimeoutExpired: + raise ADBTimeoutError("ADBDevice: unable to generate key") + + apks_path = "{}/tmp.apks".format(temporaryDirectory) + java_path = os.path.join(java_home, "bin", "java") + build_apks = [ + java_path, + "-jar", + bundletool, + "build-apks", + "--bundle={}".format(bundle_path), + "--output={}".format(apks_path), + "--connected-device", + "--device-id={}".format(device_serial), + "--adb={}".format(self._adb_path), + "--ks={}".format(keystore_path), + "--ks-key-alias=androiddebugkey", + "--key-pass=pass:android", + "--ks-pass=pass:android", + ] + self._logger.info("build_apks: %s" % build_apks) + + try: + subprocess.check_call(build_apks, timeout=timeout) + except subprocess.TimeoutExpired: + raise ADBTimeoutError("ADBDevice: unable to generate apks") + install_apks = [ + java_path, + "-jar", + bundletool, + "install-apks", + "--apks={}".format(apks_path), + "--device-id={}".format(device_serial), + "--adb={}".format(self._adb_path), + ] + self._logger.info("install_apks: %s" % install_apks) + + try: + subprocess.check_call(install_apks, timeout=timeout) + except subprocess.TimeoutExpired: + raise ADBTimeoutError("ADBDevice: unable to install apks") + + def install_app(self, apk_path, replace=False, timeout=None): + """Installs an app on the device. + + :param str apk_path: The apk file name to be installed. + :param bool replace: If True, replace existing application. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :return: string - name of installed package. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + dump_packages = "dumpsys package packages" + packages_before = set(self.shell_output(dump_packages).split("\n")) + cmd = ["install"] + if replace: + cmd.append("-r") + cmd.append(apk_path) + data = self.command_output(cmd, timeout=timeout) + if data.find("Success") == -1: + raise ADBError("install failed for %s. Got: %s" % (apk_path, data)) + packages_after = set(self.shell_output(dump_packages).split("\n")) + packages_diff = packages_after - packages_before + package_name = None + re_pkg = re.compile(r"\s+pkg=Package{[^ ]+ (.*)}") + for diff in packages_diff: + match = re_pkg.match(diff) + if match: + package_name = match.group(1) + break + return package_name + + def is_app_installed(self, app_name, timeout=None): + """Returns True if an app is installed on the device. + + :param str app_name: name of the app to be checked. + :param int timeout: maximum time in seconds for any spawned + adb process to complete before throwing an ADBTimeoutError. + This timeout is per adb call. If it is not specified, + the value set in the ADB constructor is used. + + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + pm_error_string = "Error: Could not access the Package Manager" + data = self.shell_output( + "pm list package %s" % app_name, timeout=timeout, enable_run_as=False + ) + if pm_error_string in data: + raise ADBError(pm_error_string) + output = [line for line in data.splitlines() if line.strip()] + return any(["package:{}".format(app_name) == out for out in output]) + + def launch_application( + self, + app_name, + activity_name, + intent, + url=None, + extras=None, + wait=True, + fail_if_running=True, + grant_runtime_permissions=True, + timeout=None, + is_service=False, + ): + """Launches an Android application + + :param str app_name: Name of application (e.g. `com.android.chrome`) + :param str activity_name: Name of activity to launch (e.g. `.Main`) + :param str intent: Intent to launch application with + :param str url: URL to open + :param dict extras: Extra arguments for application. + :param bool wait: If True, wait for application to start before + returning. + :param bool fail_if_running: Raise an exception if instance of + application is already running. + :param bool grant_runtime_permissions: Grant special runtime + permissions. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :param bool is_service: Whether we want to launch a service or not. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + # If fail_if_running is True, we throw an exception here. Only one + # instance of an application can be running at once on Android, + # starting a new instance may not be what we want depending on what + # we want to do + if fail_if_running and self.process_exist(app_name, timeout=timeout): + raise ADBError( + "Only one instance of an application may be running " "at once" + ) + + if grant_runtime_permissions: + self.grant_runtime_permissions(app_name) + + acmd = ["am"] + ["startservice" if is_service else "start"] + if wait: + acmd.extend(["-W"]) + acmd.extend( + [ + "-n", + "%s/%s" % (app_name, activity_name), + ] + ) + if intent: + acmd.extend(["-a", intent]) + + # Note that isinstance(True, int) and isinstance(False, int) + # is True. This means we must test the type of the value + # against bool prior to testing it against int in order to + # prevent falsely identifying a bool value as an int. + if extras: + for (key, val) in extras.items(): + if isinstance(val, bool): + extra_type_param = "--ez" + elif isinstance(val, int): + extra_type_param = "--ei" + else: + extra_type_param = "--es" + acmd.extend([extra_type_param, str(key), str(val)]) + + if url: + acmd.extend(["-d", url]) + + cmd = self._escape_command_line(acmd) + self._logger.info("launch_application: %s" % cmd) + cmd_output = self.shell_output(cmd, timeout=timeout) + if "Error:" in cmd_output: + for line in cmd_output.split("\n"): + self._logger.info(line) + raise ADBError( + "launch_application %s/%s failed" % (app_name, activity_name) + ) + + def launch_fennec( + self, + app_name, + intent="android.intent.action.VIEW", + moz_env=None, + extra_args=None, + url=None, + wait=True, + fail_if_running=True, + timeout=None, + ): + """Convenience method to launch Fennec on Android with various + debugging arguments + + :param str app_name: Name of fennec application (e.g. + `org.mozilla.fennec`) + :param str intent: Intent to launch application. + :param str moz_env: Mozilla specific environment to pass into + application. + :param str extra_args: Extra arguments to be parsed by fennec. + :param str url: URL to open + :param bool wait: If True, wait for application to start before + returning. + :param bool fail_if_running: Raise an exception if instance of + application is already running. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + extras = {} + + if moz_env: + # moz_env is expected to be a dictionary of environment variables: + # Fennec itself will set them when launched + for (env_count, (env_key, env_val)) in enumerate(moz_env.items()): + extras["env" + str(env_count)] = env_key + "=" + env_val + + # Additional command line arguments that fennec will read and use (e.g. + # with a custom profile) + if extra_args: + extras["args"] = " ".join(extra_args) + + self.launch_application( + app_name, + "org.mozilla.gecko.BrowserApp", + intent, + url=url, + extras=extras, + wait=wait, + fail_if_running=fail_if_running, + timeout=timeout, + ) + + def launch_service( + self, + app_name, + activity_name=None, + intent="android.intent.action.MAIN", + moz_env=None, + extra_args=None, + url=None, + e10s=False, + wait=True, + grant_runtime_permissions=False, + out_file=None, + timeout=None, + ): + """Convenience method to launch a service on Android with various + debugging arguments; convenient for geckoview apps. + + :param str app_name: Name of application (e.g. + `org.mozilla.geckoview_example` or `org.mozilla.geckoview.test_runner`) + :param str activity_name: Activity name, like `GeckoViewActivity`, or + `TestRunnerActivity`. + :param str intent: Intent to launch application. + :param str moz_env: Mozilla specific environment to pass into + application. + :param str extra_args: Extra arguments to be parsed by the app. + :param str url: URL to open + :param bool e10s: If True, run in multiprocess mode. + :param bool wait: If True, wait for application to start before + returning. + :param bool grant_runtime_permissions: Grant special runtime + permissions. + :param str out_file: File where to redirect the output to + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + extras = {} + + if moz_env: + # moz_env is expected to be a dictionary of environment variables: + # geckoview_example itself will set them when launched + for (env_count, (env_key, env_val)) in enumerate(moz_env.items()): + extras["env" + str(env_count)] = env_key + "=" + env_val + + # Additional command line arguments that the app will read and use (e.g. + # with a custom profile) + if extra_args: + for (arg_count, arg) in enumerate(extra_args): + extras["arg" + str(arg_count)] = arg + + extras["use_multiprocess"] = e10s + extras["out_file"] = out_file + self.launch_application( + app_name, + "%s.%s" % (app_name, activity_name), + intent, + url=url, + extras=extras, + wait=wait, + grant_runtime_permissions=grant_runtime_permissions, + timeout=timeout, + is_service=True, + fail_if_running=False, + ) + + def launch_activity( + self, + app_name, + activity_name=None, + intent="android.intent.action.MAIN", + moz_env=None, + extra_args=None, + url=None, + e10s=False, + wait=True, + fail_if_running=True, + timeout=None, + ): + """Convenience method to launch an application on Android with various + debugging arguments; convenient for geckoview apps. + + :param str app_name: Name of application (e.g. + `org.mozilla.geckoview_example` or `org.mozilla.geckoview.test_runner`) + :param str activity_name: Activity name, like `GeckoViewActivity`, or + `TestRunnerActivity`. + :param str intent: Intent to launch application. + :param str moz_env: Mozilla specific environment to pass into + application. + :param str extra_args: Extra arguments to be parsed by the app. + :param str url: URL to open + :param bool e10s: If True, run in multiprocess mode. + :param bool wait: If True, wait for application to start before + returning. + :param bool fail_if_running: Raise an exception if instance of + application is already running. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + extras = {} + + if moz_env: + # moz_env is expected to be a dictionary of environment variables: + # geckoview_example itself will set them when launched + for (env_count, (env_key, env_val)) in enumerate(moz_env.items()): + extras["env" + str(env_count)] = env_key + "=" + env_val + + # Additional command line arguments that the app will read and use (e.g. + # with a custom profile) + if extra_args: + for (arg_count, arg) in enumerate(extra_args): + extras["arg" + str(arg_count)] = arg + + extras["use_multiprocess"] = e10s + self.launch_application( + app_name, + "%s.%s" % (app_name, activity_name), + intent, + url=url, + extras=extras, + wait=wait, + fail_if_running=fail_if_running, + timeout=timeout, + ) + + def stop_application(self, app_name, timeout=None): + """Stops the specified application + + For Android 3.0+, we use the "am force-stop" to do this, which + is reliable and does not require root. For earlier versions of + Android, we simply try to manually kill the processes started + by the app repeatedly until none is around any more. This is + less reliable and does require root. + + :param str app_name: Name of application (e.g. `com.android.chrome`) + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :param bool root: Flag specifying if the command should be + executed as root. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if self.version >= version_codes.HONEYCOMB: + self.shell_output("am force-stop %s" % app_name, timeout=timeout) + else: + num_tries = 0 + max_tries = 5 + while self.process_exist(app_name, timeout=timeout): + if num_tries > max_tries: + raise ADBError( + "Couldn't successfully kill %s after %s " + "tries" % (app_name, max_tries) + ) + self.pkill(app_name, timeout=timeout) + num_tries += 1 + + # sleep for a short duration to make sure there are no + # additional processes in the process of being launched + # (this is not 100% guaranteed to work since it is inherently + # racey, but it's the best we can do) + time.sleep(1) + + def uninstall_app(self, app_name, reboot=False, timeout=None): + """Uninstalls an app on the device. + + :param str app_name: The name of the app to be + uninstalled. + :param bool reboot: Flag indicating that the device should + be rebooted after the app is uninstalled. No reboot occurs + if the app is not installed. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + if self.is_app_installed(app_name, timeout=timeout): + data = self.command_output(["uninstall", app_name], timeout=timeout) + if data.find("Success") == -1: + self._logger.debug("uninstall_app failed: %s" % data) + raise ADBError("uninstall failed for %s. Got: %s" % (app_name, data)) + self.run_as_package = None + if reboot: + self.reboot(timeout=timeout) + + def update_app(self, apk_path, timeout=None): + """Updates an app on the device and reboots. + + :param str apk_path: The apk file name to be + updated. + :param int timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. + This timeout is per adb call. The total time spent + may exceed this value. If it is not specified, the value + set in the ADB constructor is used. + :raises: :exc:`ADBTimeoutError` + :exc:`ADBError` + """ + cmd = ["install", "-r"] + if self.version >= version_codes.M: + cmd.append("-g") + cmd.append(apk_path) + output = self.command_output(cmd, timeout=timeout) + self.reboot(timeout=timeout) + return output diff --git a/testing/mozbase/mozdevice/mozdevice/adb_android.py b/testing/mozbase/mozdevice/mozdevice/adb_android.py new file mode 100644 index 0000000000..135fda4195 --- /dev/null +++ b/testing/mozbase/mozdevice/mozdevice/adb_android.py @@ -0,0 +1,13 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from .adb import ADBDevice + + +class ADBAndroid(ADBDevice): + """ADBAndroid functionality is now provided by ADBDevice. New callers + should use ADBDevice. + """ + + pass diff --git a/testing/mozbase/mozdevice/mozdevice/remote_process_monitor.py b/testing/mozbase/mozdevice/mozdevice/remote_process_monitor.py new file mode 100644 index 0000000000..29e0d4e008 --- /dev/null +++ b/testing/mozbase/mozdevice/mozdevice/remote_process_monitor.py @@ -0,0 +1,285 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import re +import time + +import six + +from .adb import ADBTimeoutError + + +class RemoteProcessMonitor: + """ + RemoteProcessMonitor provides a convenient way to run a remote process, + dump its log file, and wait for it to end. + """ + + def __init__( + self, + app_name, + device, + log, + message_logger, + remote_log_file, + remote_profile, + ): + self.app_name = app_name + self.device = device + self.log = log + self.remote_log_file = remote_log_file + self.remote_profile = remote_profile + self.counts = {} + self.counts["pass"] = 0 + self.counts["fail"] = 0 + self.counts["todo"] = 0 + self.last_test_seen = "RemoteProcessMonitor" + self.message_logger = message_logger + if self.device.is_file(self.remote_log_file): + self.device.rm(self.remote_log_file) + self.log.info("deleted remote log %s" % self.remote_log_file) + + def launch(self, app, debugger_info, test_url, extra_args, env, e10s): + """ + Start the remote activity. + """ + if self.app_name and self.device.process_exist(self.app_name): + self.log.info("%s is already running. Stopping..." % self.app_name) + self.device.stop_application(self.app_name) + args = [] + if debugger_info: + args.extend(debugger_info.args) + args.append(app) + args.extend(extra_args) + activity = "TestRunnerActivity" + self.device.launch_activity( + self.app_name, + activity_name=activity, + e10s=e10s, + moz_env=env, + extra_args=args, + url=test_url, + ) + return self.pid + + @property + def pid(self): + """ + Determine the pid of the remote process (or the first process with + the same name). + """ + procs = self.device.get_process_list() + # limit the comparison to the first 75 characters due to a + # limitation in processname length in android. + pids = [proc[0] for proc in procs if proc[1] == self.app_name[:75]] + if pids is None or len(pids) < 1: + return 0 + return pids[0] + + def read_stdout(self): + """ + Fetch the full remote log file, log any new content and return True if new + content is processed. + """ + try: + new_log_content = self.device.get_file( + self.remote_log_file, offset=self.stdout_len + ) + except ADBTimeoutError: + raise + except Exception as e: + self.log.error( + "%s | exception reading log: %s" % (self.last_test_seen, str(e)) + ) + return False + if not new_log_content: + return False + + self.stdout_len += len(new_log_content) + new_log_content = six.ensure_str(new_log_content, errors="replace") + + self.log_buffer += new_log_content + lines = self.log_buffer.split("\n") + lines = [l for l in lines if l] + + if lines: + if self.log_buffer.endswith("\n"): + # all lines are complete; no need to buffer + self.log_buffer = "" + else: + # keep the last (unfinished) line in the buffer + self.log_buffer = lines[-1] + del lines[-1] + if not lines: + return False + + for line in lines: + # This passes the line to the logger (to be logged or buffered) + if isinstance(line, six.text_type): + # if line is unicode - let's encode it to bytes + parsed_messages = self.message_logger.write( + line.encode("UTF-8", "replace") + ) + else: + # if line is bytes type, write it as it is + parsed_messages = self.message_logger.write(line) + + for message in parsed_messages: + if isinstance(message, dict): + if message.get("action") == "test_start": + self.last_test_seen = message["test"] + elif message.get("action") == "test_end": + self.last_test_seen = "{} (finished)".format(message["test"]) + elif message.get("action") == "suite_end": + self.last_test_seen = "Last test finished" + elif message.get("action") == "log": + line = message["message"].strip() + m = re.match(".*:\s*(\d*)", line) + if m: + try: + val = int(m.group(1)) + if "Passed:" in line: + self.counts["pass"] += val + self.last_test_seen = "Last test finished" + elif "Failed:" in line: + self.counts["fail"] += val + elif "Todo:" in line: + self.counts["todo"] += val + except ADBTimeoutError: + raise + except Exception: + pass + + return True + + def wait(self, timeout=None): + """ + Wait for the remote process to end (or for its activity to go to background). + While waiting, periodically retrieve the process output and print it. + If the process is still running but no output is received in *timeout* + seconds, return False; else, once the process exits/goes to background, + return True. + """ + self.log_buffer = "" + self.stdout_len = 0 + + timer = 0 + output_timer = 0 + interval = 10 + status = True + top = self.app_name + + # wait for log creation on startup + retries = 0 + while retries < 20 and not self.device.is_file(self.remote_log_file): + retries += 1 + time.sleep(1) + if self.device.is_file(self.remote_log_file): + # We must change the remote log's permissions so that the shell can read it. + self.device.chmod(self.remote_log_file, mask="666") + else: + self.log.warning( + "Failed wait for remote log: %s missing?" % self.remote_log_file + ) + + while top == self.app_name: + has_output = self.read_stdout() + if has_output: + output_timer = 0 + if self.counts["pass"] > 0: + interval = 0.5 + time.sleep(interval) + timer += interval + output_timer += interval + if timeout and output_timer > timeout: + status = False + break + if not has_output: + top = self.device.get_top_activity(timeout=60) + if top is None: + self.log.info("Failed to get top activity, retrying, once...") + top = self.device.get_top_activity(timeout=60) + + # Flush anything added to stdout during the sleep + self.read_stdout() + self.log.info("wait for %s complete; top activity=%s" % (self.app_name, top)) + if top == self.app_name: + self.log.info("%s unexpectedly found running. Killing..." % self.app_name) + self.kill() + if not status: + self.log.error( + "TEST-UNEXPECTED-FAIL | %s | " + "application timed out after %d seconds with no output" + % (self.last_test_seen, int(timeout)) + ) + return status + + def kill(self): + """ + End a troublesome remote process: Trigger ANR and breakpad dumps, then + force the application to end. + """ + + # Trigger an ANR report with "kill -3" (SIGQUIT) + try: + self.device.pkill(self.app_name, sig=3, attempts=1) + except ADBTimeoutError: + raise + except: # NOQA: E722 + pass + time.sleep(3) + + # Trigger a breakpad dump with "kill -6" (SIGABRT) + try: + self.device.pkill(self.app_name, sig=6, attempts=1) + except ADBTimeoutError: + raise + except: # NOQA: E722 + pass + + # Wait for process to end + retries = 0 + while retries < 3: + if self.device.process_exist(self.app_name): + self.log.info( + "%s still alive after SIGABRT: waiting..." % self.app_name + ) + time.sleep(5) + else: + break + retries += 1 + if self.device.process_exist(self.app_name): + try: + self.device.pkill(self.app_name, sig=9, attempts=1) + except ADBTimeoutError: + raise + except: # NOQA: E722 + self.log.error("%s still alive after SIGKILL!" % self.app_name) + if self.device.process_exist(self.app_name): + self.device.stop_application(self.app_name) + + # Test harnesses use the MOZ_CRASHREPORTER environment variables to suppress + # the interactive crash reporter, but that may not always be effective; + # check for and cleanup errant crashreporters. + crashreporter = "%s.CrashReporter" % self.app_name + if self.device.process_exist(crashreporter): + self.log.warning( + "%s unexpectedly found running. Killing..." % crashreporter + ) + try: + self.device.pkill(crashreporter) + except ADBTimeoutError: + raise + except: # NOQA: E722 + pass + if self.device.process_exist(crashreporter): + self.log.error("%s still running!!" % crashreporter) + + @staticmethod + def elf_arm(filename): + """ + Determine if the specified file is an ARM binary. + """ + data = open(filename, "rb").read(20) + return data[:4] == "\x7fELF" and ord(data[18]) == 40 # EM_ARM diff --git a/testing/mozbase/mozdevice/mozdevice/version_codes.py b/testing/mozbase/mozdevice/mozdevice/version_codes.py new file mode 100644 index 0000000000..c1d56c7b84 --- /dev/null +++ b/testing/mozbase/mozdevice/mozdevice/version_codes.py @@ -0,0 +1,70 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +VERSION CODES of the android releases. + +See http://developer.android.com/reference/android/os/Build.VERSION_CODES.html. +""" +# Magic version number for a current development build, which has +# not yet turned into an official release. +CUR_DEVELOPMENT = 10000 + +# October 2008: The original, first, version of Android +BASE = 1 +# February 2009: First Android update, officially called 1.1 +BASE_1_1 = 2 +# May 2009: Android 1.5 +CUPCAKE = 3 +# September 2009: Android 1.6 +DONUT = 4 +# November 2009: Android 2.0 +ECLAIR = 5 +# December 2009: Android 2.0.1 +ECLAIR_0_1 = 6 +# January 2010: Android 2.1 +ECLAIR_MR1 = 7 +# June 2010: Android 2.2 +FROYO = 8 +# November 2010: Android 2.3 +GINGERBREAD = 9 +# February 2011: Android 2.3.3 +GINGERBREAD_MR1 = 10 +# February 2011: Android 3.0 +HONEYCOMB = 11 +# May 2011: Android 3.1 +HONEYCOMB_MR1 = 12 +# June 2011: Android 3.2 +HONEYCOMB_MR2 = 13 +# October 2011: Android 4.0 +ICE_CREAM_SANDWICH = 14 +# December 2011: Android 4.0.3 +ICE_CREAM_SANDWICH_MR1 = 15 +# June 2012: Android 4.1 +JELLY_BEAN = 16 +# November 2012: Android 4.2 +JELLY_BEAN_MR1 = 17 +# July 2013: Android 4.3 +JELLY_BEAN_MR2 = 18 +# October 2013: Android 4.4 +KITKAT = 19 +# Android 4.4W +KITKAT_WATCH = 20 +# Lollilop +LOLLIPOP = 21 +LOLLIPOP_MR1 = 22 +# Marshmallow +M = 23 +# Nougat +N = 24 +N_MR1 = 25 +# Oreo +O = 26 +O_MR1 = 27 +# Pie +P = 28 +# 10 +Q = 29 +# 11 +R = 30 diff --git a/testing/mozbase/mozdevice/setup.cfg b/testing/mozbase/mozdevice/setup.cfg new file mode 100644 index 0000000000..2a9acf13da --- /dev/null +++ b/testing/mozbase/mozdevice/setup.cfg @@ -0,0 +1,2 @@ +[bdist_wheel] +universal = 1 diff --git a/testing/mozbase/mozdevice/setup.py b/testing/mozbase/mozdevice/setup.py new file mode 100644 index 0000000000..4d876ab00c --- /dev/null +++ b/testing/mozbase/mozdevice/setup.py @@ -0,0 +1,34 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from setuptools import setup + +PACKAGE_NAME = "mozdevice" +PACKAGE_VERSION = "4.1.0" + +deps = ["mozlog >= 6.0"] + +setup( + name=PACKAGE_NAME, + version=PACKAGE_VERSION, + description="Mozilla-authored device management", + long_description="see https://firefox-source-docs.mozilla.org/mozbase/index.html", + classifiers=[ + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.5", + ], + # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers + keywords="", + author="Mozilla Automation and Testing Team", + author_email="tools@lists.mozilla.org", + url="https://wiki.mozilla.org/Auto-tools/Projects/Mozbase", + license="MPL", + packages=["mozdevice"], + include_package_data=True, + zip_safe=False, + install_requires=deps, + entry_points=""" + # -*- Entry points: -*- + """, +) diff --git a/testing/mozbase/mozdevice/tests/conftest.py b/testing/mozbase/mozdevice/tests/conftest.py new file mode 100644 index 0000000000..831090a428 --- /dev/null +++ b/testing/mozbase/mozdevice/tests/conftest.py @@ -0,0 +1,236 @@ +import sys +from random import randint, seed +from unittest.mock import patch + +import mozdevice +import pytest +from six import StringIO + +# set up required module-level variables/objects +seed(1488590) + + +def random_tcp_port(): + """Returns a pseudo-random integer generated from a seed. + + :returns: int: pseudo-randomly generated integer + """ + return randint(8000, 12000) + + +@pytest.fixture(autouse=True) +def mock_command_output(monkeypatch): + """Monkeypatches the ADBDevice.command_output() method call. + + Instead of calling the concrete method implemented in adb.py::ADBDevice, + this method simply returns a string representation of the command that was + received. + + As an exception, if the command begins with "forward tcp:0 ", this method + returns a mock port number. + + :param object monkeypatch: pytest provided fixture for mocking. + """ + + def command_output_wrapper(object, cmd, timeout): + """Actual monkeypatch implementation of the command_output method call. + + :param object object: placeholder object representing ADBDevice + :param str cmd: command to be executed + :param timeout: unused parameter to represent timeout threshold + :returns: string - string representation of command to be executed + int - mock port number (only used when cmd begins with "forward tcp:0 ") + """ + + if cmd[0] == "forward" and cmd[1] == "tcp:0": + return 7777 + + print(str(cmd)) + return str(cmd) + + monkeypatch.setattr(mozdevice.ADBDevice, "command_output", command_output_wrapper) + + +@pytest.fixture(autouse=True) +def mock_shell_output(monkeypatch): + """Monkeypatches the ADBDevice.shell_output() method call. + + Instead of returning the output of an adb call, this method will + return appropriate string output. Content of the string output is + in line with the calling method's expectations. + + :param object monkeypatch: pytest provided fixture for mocking. + """ + + def shell_output_wrapper( + object, cmd, env=None, cwd=None, timeout=None, enable_run_as=False + ): + """Actual monkeypatch implementation of the shell_output method call. + + :param object object: placeholder object representing ADBDevice + :param str cmd: command to be executed + :param env: contains the environment variable + :type env: dict or None + :param cwd: The directory from which to execute. + :type cwd: str or None + :param timeout: unused parameter tp represent timeout threshold + :param enable_run_as: bool determining if run_as <app> is to be used + :returns: string - string representation of a simulated call to adb + """ + if "pm list package error" in cmd: + return "Error: Could not access the Package Manager" + elif "pm list package none" in cmd: + return "" + elif "pm list package" in cmd: + apps = ["org.mozilla.fennec", "org.mozilla.geckoview_example"] + return ("package:{}\n" * len(apps)).format(*apps) + else: + print(str(cmd)) + return str(cmd) + + monkeypatch.setattr(mozdevice.ADBDevice, "shell_output", shell_output_wrapper) + + +@pytest.fixture(autouse=True) +def mock_is_path_internal_storage(monkeypatch): + """Monkeypatches the ADBDevice.is_path_internal_storage() method call. + + Instead of returning the outcome of whether the path provided is + internal storage or external, this will always return True. + + :param object monkeypatch: pytest provided fixture for mocking. + """ + + def is_path_internal_storage_wrapper(object, path, timeout=None): + """Actual monkeypatch implementation of the is_path_internal_storage() call. + + :param str path: The path to test. + :param timeout: The maximum time in + seconds for any spawned adb process to complete before + throwing an ADBTimeoutError. This timeout is per adb call. The + total time spent may exceed this value. If it is not + specified, the value set in the ADBDevice constructor is used. + :returns: boolean + + :raises: * ADBTimeoutError + * ADBError + """ + if "internal_storage" in path: + return True + return False + + monkeypatch.setattr( + mozdevice.ADBDevice, + "is_path_internal_storage", + is_path_internal_storage_wrapper, + ) + + +@pytest.fixture(autouse=True) +def mock_enable_run_as_for_path(monkeypatch): + """Monkeypatches the ADBDevice.enable_run_as_for_path(path) method. + + Always return True + + :param object monkeypatch: pytest provided fixture for mocking. + """ + + def enable_run_as_for_path_wrapper(object, path): + """Actual monkeypatch implementation of the enable_run_as_for_path() call. + + :param str path: The path to test. + :returns: boolean + """ + return True + + monkeypatch.setattr( + mozdevice.ADBDevice, "enable_run_as_for_path", enable_run_as_for_path_wrapper + ) + + +@pytest.fixture(autouse=True) +def mock_shell_bool(monkeypatch): + """Monkeypatches the ADBDevice.shell_bool() method call. + + Instead of returning the output of an adb call, this method will + return appropriate string output. Content of the string output is + in line with the calling method's expectations. + + :param object monkeypatch: pytest provided fixture for mocking. + """ + + def shell_bool_wrapper( + object, cmd, env=None, cwd=None, timeout=None, enable_run_as=False + ): + """Actual monkeypatch implementation of the shell_bool method call. + + :param object object: placeholder object representing ADBDevice + :param str cmd: command to be executed + :param env: contains the environment variable + :type env: dict or None + :param cwd: The directory from which to execute. + :type cwd: str or None + :param timeout: unused parameter tp represent timeout threshold + :param enable_run_as: bool determining if run_as <app> is to be used + :returns: string - string representation of a simulated call to adb + """ + print(cmd) + return str(cmd) + + monkeypatch.setattr(mozdevice.ADBDevice, "shell_bool", shell_bool_wrapper) + + +@pytest.fixture(autouse=True) +def mock_adb_object(): + """Patches the __init__ method call when instantiating ADBDevice. + + ADBDevice normally requires instantiated objects in order to execute + its commands. + + With a pytest-mock patch, we are able to mock the initialization of + the ADBDevice object. By yielding the instantiated mock object, + unit tests can be run that call methods that require an instantiated + object. + + :yields: ADBDevice - mock instance of ADBDevice object + """ + with patch.object(mozdevice.ADBDevice, "__init__", lambda self: None): + yield mozdevice.ADBDevice() + + +@pytest.fixture +def redirect_stdout_and_assert(): + """Redirects the stdout pipe temporarily to a StringIO stream. + + This is useful to assert on methods that do not return + a value, such as most ADBDevice methods. + + The original stdout pipe is preserved throughout the process. + + :returns: _wrapper method + """ + + def _wrapper(func, **kwargs): + """Implements the stdout sleight-of-hand. + + After preserving the original sys.stdout, it is switched + to use cStringIO.StringIO. + + Method with no return value is called, and the stdout + pipe is switched back to the original sys.stdout. + + The expected outcome is received as part of the kwargs. + This is asserted against a sanitized output from the method + under test. + + :param object func: method under test + :param dict kwargs: dictionary of function parameters + """ + original_stdout = sys.stdout + sys.stdout = testing_stdout = StringIO() + expected_text = kwargs.pop("text") + func(**kwargs) + sys.stdout = original_stdout + assert expected_text in testing_stdout.getvalue().rstrip() + + return _wrapper diff --git a/testing/mozbase/mozdevice/tests/manifest.ini b/testing/mozbase/mozdevice/tests/manifest.ini new file mode 100644 index 0000000000..208fd06608 --- /dev/null +++ b/testing/mozbase/mozdevice/tests/manifest.ini @@ -0,0 +1,6 @@ +[DEFAULT] +subsuite = mozbase +[test_socket_connection.py] +[test_is_app_installed.py] +[test_chown.py] +[test_escape_command_line.py] diff --git a/testing/mozbase/mozdevice/tests/test_chown.py b/testing/mozbase/mozdevice/tests/test_chown.py new file mode 100644 index 0000000000..1bbfcc5d8e --- /dev/null +++ b/testing/mozbase/mozdevice/tests/test_chown.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python + +import logging +from unittest.mock import patch + +import mozunit +import pytest + + +@pytest.mark.parametrize("boolean_value", [True, False]) +def test_set_chown_r_attribute( + mock_adb_object, redirect_stdout_and_assert, boolean_value +): + mock_adb_object._chown_R = boolean_value + assert mock_adb_object._chown_R == boolean_value + + +def test_chown_path_internal(mock_adb_object, redirect_stdout_and_assert): + """Tests whether attempt to chown internal path is ignored""" + with patch.object(logging, "getLogger") as mock_log: + mock_adb_object._logger = mock_log + + testing_parameters = { + "owner": "someuser", + "path": "internal_storage", + } + expected = "Ignoring attempt to chown external storage" + mock_adb_object.chown(**testing_parameters) + assert "".join(mock_adb_object._logger.method_calls[0][1]) != "" + assert "".join(mock_adb_object._logger.method_calls[0][1]) == expected + + +def test_chown_one_path(mock_adb_object, redirect_stdout_and_assert): + """Tests the path where only one path is provided.""" + # set up mock logging and self._chown_R attribute. + with patch.object(logging, "getLogger") as mock_log: + mock_adb_object._logger = mock_log + mock_adb_object._chown_R = True + + testing_parameters = { + "owner": "someuser", + "path": "/system", + } + command = "chown {owner} {path}".format(**testing_parameters) + testing_parameters["text"] = command + redirect_stdout_and_assert(mock_adb_object.chown, **testing_parameters) + + +def test_chown_one_path_with_group(mock_adb_object, redirect_stdout_and_assert): + """Tests the path where group is provided.""" + # set up mock logging and self._chown_R attribute. + with patch.object(logging, "getLogger") as mock_log: + mock_adb_object._logger = mock_log + mock_adb_object._chown_R = True + + testing_parameters = { + "owner": "someuser", + "path": "/system", + "group": "group_2", + } + command = "chown {owner}.{group} {path}".format(**testing_parameters) + testing_parameters["text"] = command + redirect_stdout_and_assert(mock_adb_object.chown, **testing_parameters) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozdevice/tests/test_escape_command_line.py b/testing/mozbase/mozdevice/tests/test_escape_command_line.py new file mode 100644 index 0000000000..112dd936c5 --- /dev/null +++ b/testing/mozbase/mozdevice/tests/test_escape_command_line.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +import mozunit + + +def test_escape_command_line(mock_adb_object, redirect_stdout_and_assert): + """Test _escape_command_line.""" + cases = { + # expected output : test input + "adb shell ls -l": ["adb", "shell", "ls", "-l"], + "adb shell 'ls -l'": ["adb", "shell", "ls -l"], + "-e 'if (true)'": ["-e", "if (true)"], + "-e 'if (x === \"hello\")'": ["-e", 'if (x === "hello")'], + "-e 'if (x === '\"'\"'hello'\"'\"')'": ["-e", "if (x === 'hello')"], + } + for expected, input in cases.items(): + assert mock_adb_object._escape_command_line(input) == expected + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozdevice/tests/test_is_app_installed.py b/testing/mozbase/mozdevice/tests/test_is_app_installed.py new file mode 100644 index 0000000000..a51836bc02 --- /dev/null +++ b/testing/mozbase/mozdevice/tests/test_is_app_installed.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +import mozunit +import pytest +from mozdevice import ADBError + + +def test_is_app_installed(mock_adb_object): + """Tests that is_app_installed returns True if app is installed.""" + assert mock_adb_object.is_app_installed("org.mozilla.geckoview_example") + + +def test_is_app_installed_not_installed(mock_adb_object): + """Tests that is_app_installed returns False if provided app_name + does not resolve.""" + assert not mock_adb_object.is_app_installed("some_random_name") + + +def test_is_app_installed_partial_name(mock_adb_object): + """Tests that is_app_installed returns False if provided app_name + is only a partial match.""" + assert not mock_adb_object.is_app_installed("fennec") + + +def test_is_app_installed_package_manager_error(mock_adb_object): + """Tests that is_app_installed is able to raise an exception.""" + with pytest.raises(ADBError): + mock_adb_object.is_app_installed("error") + + +def test_is_app_installed_no_installed_package_found(mock_adb_object): + """Tests that is_app_installed is able to handle scenario + where no installed packages are found.""" + assert not mock_adb_object.is_app_installed("none") + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozdevice/tests/test_socket_connection.py b/testing/mozbase/mozdevice/tests/test_socket_connection.py new file mode 100644 index 0000000000..1182737546 --- /dev/null +++ b/testing/mozbase/mozdevice/tests/test_socket_connection.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python + +import mozunit +import pytest +from conftest import random_tcp_port + + +@pytest.fixture(params=["tcp:{}".format(random_tcp_port()) for _ in range(5)]) +def select_test_port(request): + """Generate a list of ports to be used for testing.""" + yield request.param + + +def test_list_socket_connections_reverse(mock_adb_object): + assert [("['reverse',", "'--list']")] == mock_adb_object.list_socket_connections( + "reverse" + ) + + +def test_list_socket_connections_forward(mock_adb_object): + assert [("['forward',", "'--list']")] == mock_adb_object.list_socket_connections( + "forward" + ) + + +def test_create_socket_connection_reverse( + mock_adb_object, select_test_port, redirect_stdout_and_assert +): + _expected = "['reverse', '{0}', '{0}']".format(select_test_port) + redirect_stdout_and_assert( + mock_adb_object.create_socket_connection, + direction="reverse", + local=select_test_port, + remote=select_test_port, + text=_expected, + ) + + +def test_create_socket_connection_forward( + mock_adb_object, select_test_port, redirect_stdout_and_assert +): + _expected = "['forward', '{0}', '{0}']".format(select_test_port) + redirect_stdout_and_assert( + mock_adb_object.create_socket_connection, + direction="forward", + local=select_test_port, + remote=select_test_port, + text=_expected, + ) + + +def test_create_socket_connection_forward_adb_assigned_port( + mock_adb_object, select_test_port +): + result = mock_adb_object.create_socket_connection( + direction="forward", local="tcp:0", remote=select_test_port + ) + assert isinstance(result, int) and result == 7777 + + +def test_remove_socket_connections_reverse(mock_adb_object, redirect_stdout_and_assert): + _expected = "['reverse', '--remove-all']" + redirect_stdout_and_assert( + mock_adb_object.remove_socket_connections, direction="reverse", text=_expected + ) + + +def test_remove_socket_connections_forward(mock_adb_object, redirect_stdout_and_assert): + _expected = "['forward', '--remove-all']" + redirect_stdout_and_assert( + mock_adb_object.remove_socket_connections, direction="forward", text=_expected + ) + + +def test_legacy_forward(mock_adb_object, select_test_port, redirect_stdout_and_assert): + _expected = "['forward', '{0}', '{0}']".format(select_test_port) + redirect_stdout_and_assert( + mock_adb_object.forward, + local=select_test_port, + remote=select_test_port, + text=_expected, + ) + + +def test_legacy_forward_adb_assigned_port(mock_adb_object, select_test_port): + result = mock_adb_object.forward(local="tcp:0", remote=select_test_port) + assert isinstance(result, int) and result == 7777 + + +def test_legacy_reverse(mock_adb_object, select_test_port, redirect_stdout_and_assert): + _expected = "['reverse', '{0}', '{0}']".format(select_test_port) + redirect_stdout_and_assert( + mock_adb_object.reverse, + local=select_test_port, + remote=select_test_port, + text=_expected, + ) + + +def test_validate_port_invalid_prefix(mock_adb_object): + with pytest.raises(ValueError): + mock_adb_object._validate_port("{}".format("invalid"), is_local=True) + + +@pytest.mark.xfail +def test_validate_port_non_numerical_port_identifier(mock_adb_object): + with pytest.raises(AttributeError): + mock_adb_object._validate_port( + "{}".format("tcp:this:is:not:a:number"), is_local=True + ) + + +def test_validate_port_identifier_length_short(mock_adb_object): + with pytest.raises(ValueError): + mock_adb_object._validate_port("{}".format("tcp"), is_local=True) + + +def test_validate_direction(mock_adb_object): + with pytest.raises(ValueError): + mock_adb_object._validate_direction("{}".format("bad direction")) + + +if __name__ == "__main__": + mozunit.main() |