# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import platform import re import signal import subprocess import sys from mozlint import result from mozlint.pathutils import expand_exclusions here = os.path.abspath(os.path.dirname(__file__)) def default_bindir(): # We use sys.prefix to find executables as that gets modified with # virtualenv's activate_this.py, whereas sys.executable doesn't. if platform.system() == "Windows": return os.path.join(sys.prefix, "Scripts") else: return os.path.join(sys.prefix, "bin") def get_black_version(binary): """ Returns found binary's version """ try: output = subprocess.check_output( [binary, "--version"], stderr=subprocess.STDOUT, universal_newlines=True, ) except subprocess.CalledProcessError as e: output = e.output try: # Accept `black.EXE, version ...` on Windows. # for old version of black, the output is # black, version 21.4b2 # From black 21.11b1, the output is like # black, 21.11b1 (compiled: no) return re.match(r"black.*,( version)? (\S+)", output)[2] except TypeError as e: print(f"Could not parse the version '{output}'") print(f"Error: {e}") def parse_issues(config, output, paths, *, log): would_reformat = re.compile("^would reformat (.*)$", re.I) reformatted = re.compile("^reformatted (.*)$", re.I) cannot_reformat = re.compile("^error: cannot format (.*?): (.*)$", re.I) results = [] for l in output.split(b"\n"): line = l.decode("utf-8").rstrip("\r\n") if line.startswith("All done!") or line.startswith("Oh no!"): break match = would_reformat.match(line) if match: res = {"path": match.group(1), "level": "error"} results.append(result.from_config(config, **res)) continue match = reformatted.match(line) if match: res = {"path": match.group(1), "level": "warning", "message": "reformatted"} results.append(result.from_config(config, **res)) continue match = cannot_reformat.match(line) if match: res = {"path": match.group(1), "level": "error", "message": match.group(2)} results.append(result.from_config(config, **res)) continue log.debug(f"Unhandled line: {line}") return results def run_process(config, cmd): orig = signal.signal(signal.SIGINT, signal.SIG_IGN) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) signal.signal(signal.SIGINT, orig) try: output, _ = proc.communicate() proc.wait() except KeyboardInterrupt: proc.kill() return output def run_black(config, paths, fix=None, *, log, virtualenv_bin_path): fixed = 0 binary = os.path.join(virtualenv_bin_path or default_bindir(), "black") log.debug(f"Black version {get_black_version(binary)}") cmd_args = [binary] if not fix: cmd_args.append("--check") if platform.system() == "Windows": MAX_PATHS_PER_JOB = ( 50 # set a max size to prevent command lines that are too long on Windows ) chunked_paths = [ paths[i : i + MAX_PATHS_PER_JOB] for i in range(0, len(paths), MAX_PATHS_PER_JOB) ] else: chunked_paths = [paths] all_output = [] for paths_chunk in chunked_paths: base_command = cmd_args + paths_chunk log.debug("Command: {}".format(" ".join(base_command))) output = parse_issues( config, run_process(config, base_command), paths_chunk, log=log ) all_output.extend(output) # black returns an issue for fixed files as well for eachIssue in output: if eachIssue.message == "reformatted": fixed += 1 return {"results": all_output, "fixed": fixed} def lint(paths, config, fix=None, **lintargs): files = list(expand_exclusions(paths, config, lintargs["root"])) return run_black( config, files, fix=fix, log=lintargs["log"], virtualenv_bin_path=lintargs.get("virtualenv_bin_path"), )