#!/usr/bin/env python3 # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. import hashlib import json import re import sys import webbrowser from typing import Union import bugzilla import click from click.utils import echo from qm_try_analysis import stackanalysis, utils from qm_try_analysis.logging import error, info, warning # Flag for toggling development mod DEV = True # Constants for Bugzilla URLs if DEV: BUGZILLA_BASE_URL = "https://bugzilla-dev.allizom.org/" else: BUGZILLA_BASE_URL = "https://bugzilla.mozilla.org/" BUGZILLA_API_URL = BUGZILLA_BASE_URL + "rest/" BUGZILLA_ATTACHMENT_URL = BUGZILLA_BASE_URL + "attachment.cgi?id=" BUGZILLA_BUG_URL = BUGZILLA_BASE_URL + "show_bug.cgi?id=" # Constants for static bugs QM_TRY_FAILURES_BUG = 1702411 WARNING_STACKS_BUG = 1711703 # Regex pattern for parsing anchor strings ANCHOR_REGEX_PATTERN = re.compile(r"([^:]+):([^:]+)?:*([^:]+)?") @click.command() @click.option( "-k", "--key", help="Your personal Bugzilla API key", required=True, ) @click.option( "--stacksfile", type=click.File("rb"), help="The output file of the previous analysis run. You only have to specify this, if the previous run does not include this info.", ) @click.option( "--open-modified/--no-open-modified", default=True, help="Whether to open modified bugs in your default browser after updating them.", ) @click.option( "-w", "--workdir", type=click.Path(file_okay=False, exists=True, writable=True), default="output", help="Working directory", ) def report_qm_failures(key, stacksfile, open_modified, workdir): """ Report QM failures to Bugzilla based on stack analysis. """ run = utils.getLastRunFromExecutionFile(workdir) # Check for valid execution file from the previous run if not {"errorfile", "warnfile"} <= run.keys(): error("No analyzable execution from the previous run of analyze found.") echo("Did you remember to run `poetry run qm-try-analysis analyze`?") sys.exit(2) # Handle missing stacksfile if not stacksfile: if "stacksfile" not in run: error( "The previous analyze run did not contain the location of the stacksfile." ) echo('Please provide the file location using the "--stacksfile" option.') sys.exit(2) stacksfile = open(run["stacksfile"], "rb") # Create Bugzilla client bugzilla_client = bugzilla.Bugzilla(url=BUGZILLA_API_URL, api_key=key) # Initialize report data report = run.get("report", {}) run["report"] = report attachment_id = report.get("stacksfile_attachment", None) reported = report.get("reported", []) report["reported"] = reported def post_comment(bug_id, comment): """ Post a comment to a Bugzilla bug. """ data = {"id": bug_id, "comment": comment, "is_markdown": True} res = bugzilla_client._post(f"bug/{bug_id}/comment", json.dumps(data)) return res["id"] # Handle missing attachment ID if not attachment_id: attachment = bugzilla.DotDict() attachment.file_name = f"qmstacks_until_{run['lasteventtime']}.txt" attachment.summary = attachment.file_name attachment.content_type = "text/plain" attachment.data = stacksfile.read().decode() res = bugzilla_client.post_attachment(QM_TRY_FAILURES_BUG, attachment) attachment_id = next(iter(res["attachments"].values()))["id"] report["stacksfile_attachment"] = attachment_id utils.updateLastRunToExecutionFile(workdir, run) info( f'Created attachment for "{attachment.file_name}": {BUGZILLA_ATTACHMENT_URL + str(attachment_id)}.' ) def generate_comment(stacks): """ Generate a comment for Bugzilla based on error stacks. """ comment = f"Taken from Attachment {attachment_id}\n\n" comment += stackanalysis.printStacks(stacks) return comment # Handle missing warnings comment if "warnings_comment" not in report: warning_stacks = utils.readJSONFile(run["warnfile"]) warning_stacks = filter(lambda stack: stack["hit_count"] >= 100, warning_stacks) comment = generate_comment(warning_stacks) comment_id = post_comment(WARNING_STACKS_BUG, comment) report["warnings_comment"] = comment_id utils.updateLastRunToExecutionFile(workdir, run) info("Created comment for warning stacks.") error_stacks = utils.readJSONFile(run["errorfile"]) def reduce(search_results, by: str) -> Union[int, None]: """ Reduce bug search results automatically or based on user input. """ anchor = by search_results = remove_duplicates(search_results, bugzilla_client) if not search_results: return if len(search_results) == 1: return search_results[0]["id"] echo(f'Multiple bugs found for anchor "{anchor}":') for i, result in enumerate(search_results, start=1): echo( f"{i}.{' [closed]' if result['resolution'] != '' else ''} {BUGZILLA_BUG_URL + str(result['id'])} - {result['summary']}" ) choice = click.prompt( "Enter the number of the bug you want to use", type=click.Choice( [str(i) for i in range(1, len(search_results) + 1)] + ["skip"] ), default="skip", show_default=True, confirmation_prompt="Please confirm the selected choice", ) if choice == "skip": return return search_results[int(choice) - 1]["id"] anchors = stackanalysis.groupStacksForAnchors(error_stacks) for anchor in anchors: if hash_str(anchor) in reported: info(f'Skipping anchor "{anchor}" since it has already been reported.') continue if not (match := ANCHOR_REGEX_PATTERN.match(anchor)): warning(f'"{anchor}" did not match the regex pattern.') if "Unknown" in match.group(2): warning(f'Skipping "{anchor}" since it is not a valid anchor.') continue search_string = " ".join(filter(None, match.groups())) search_results = bugzilla_client.search_bugs( [{"product": "Core", "summary": search_string}] )["bugs"] if bug_id := reduce(search_results, by=anchor): info(f'Found bug {BUGZILLA_BUG_URL + str(bug_id)} for anchor "{anchor}".') else: warning(f'No bug found for anchor "{anchor}".') if not click.confirm("Would you like to create one?"): continue bug = bugzilla.DotDict() bug.product = "Core" bug.component = "Storage: Quota Manager" bug.summary = f"[QM_TRY] Failures in {anchor}" bug.description = f"This bug keeps track of the semi-automatic monitoring of QM_TRY failures in `{anchor}`" bug["type"] = "defect" bug.blocks = QM_TRY_FAILURES_BUG bug.version = "unspecified" bug_id = bugzilla_client.post_bug(bug)["id"] info(f'Created bug {BUGZILLA_BUG_URL + str(bug_id)} for anchor "{anchor}".') comment = generate_comment(anchors[anchor]["stacks"]) comment_id = post_comment(bug_id, comment) reported.append(hash_str(anchor)) utils.updateLastRunToExecutionFile(workdir, run) if open_modified: comment_seq_number = bugzilla_client.get_comment(comment_id)["comments"][ str(comment_id) ]["count"] webbrowser.open( BUGZILLA_BUG_URL + str(bug_id) + "#c" + str(comment_seq_number) ) def hash_str(s): """ Hash a string using MD5. """ encoded_str = s.encode("utf-8") return int(hashlib.md5(encoded_str).hexdigest(), 16) def remove_duplicates(search_results, bugzilla_client): """ Remove duplicate bugs in search results. """ resolved_bugs = set(bug["id"] for bug in search_results if not bug.get("dupe_of")) def resolve_if_dupe(bug): if not (dupe_of := bug.get("dupe_of")): return bug if dupe_of in resolved_bugs: return None remote = resolve_if_dupe(bugzilla_client.get_bug(dupe_of)) if remote: resolved_bugs.add(remote["id"]) return remote return [non_dupe for bug in search_results if (non_dupe := resolve_if_dupe(bug))] if __name__ == "__main__": report_qm_failures()