322 lines
10 KiB
Python
Executable file
322 lines
10 KiB
Python
Executable file
#!/usr/bin/python3
|
|
# Copyright (c) 2020 Jelmer Vernooij <jelmer@debian.org>
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 3
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# See file /usr/share/common-licenses/GPL-3 for more details.
|
|
#
|
|
# pylint: disable=invalid-name
|
|
# pylint: enable=invalid-name
|
|
|
|
"""
|
|
Command-line interface for the Debian Janitor.
|
|
|
|
See https://janitor.debian.net/
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
from typing import Any, Optional
|
|
from urllib.error import HTTPError
|
|
from urllib.parse import quote, urlencode
|
|
from urllib.request import Request, urlopen
|
|
|
|
from debian.changelog import Changelog
|
|
|
|
import devscripts
|
|
|
|
DEFAULT_API_URL = "https://janitor.debian.net/api/"
|
|
USER_AGENT = f"devscripts janitor cli ({devscripts.__version__})"
|
|
DEFAULT_URLLIB_TIMEOUT = 30
|
|
|
|
|
|
def _get_json_url(http_url: str, timeout: int = DEFAULT_URLLIB_TIMEOUT) -> Any:
|
|
headers = {"User-Agent": USER_AGENT, "Accept": "application/json"}
|
|
logging.debug("Retrieving %s", http_url)
|
|
with urlopen(Request(http_url, headers=headers), timeout=timeout) as resp:
|
|
http_contents = resp.read()
|
|
return json.loads(http_contents)
|
|
|
|
|
|
def schedule(source, campaign, api_url=DEFAULT_API_URL):
|
|
"""Schedule a new run for a package.
|
|
|
|
Args:
|
|
source: the source package name
|
|
campaign: the campaign to schedule for
|
|
"""
|
|
url = f"{api_url}{quote(campaign)}/pkg/{quote(source)}/schedule"
|
|
headers = {"User-Agent": USER_AGENT}
|
|
req = Request(url, headers=headers, method="POST")
|
|
try:
|
|
with urlopen(req) as resp:
|
|
resp = json.load(resp)
|
|
except HTTPError as err:
|
|
if err.code == 404:
|
|
raise NoSuchSource(json.loads(err.read())["reason"]) from err
|
|
raise
|
|
estimated_duration = resp["estimated_duration_seconds"]
|
|
queue_position = resp["queue_position"]
|
|
queue_wait_time = resp["queue_wait_time"]
|
|
return (estimated_duration, queue_position, queue_wait_time)
|
|
|
|
|
|
class MissingDiffError(Exception):
|
|
"""There is no diff for the specified package/campaign combination."""
|
|
|
|
|
|
class NoSuchSource(Exception):
|
|
"""There is no source package known with the specified name."""
|
|
|
|
|
|
def diff(source, campaign, api_url=DEFAULT_API_URL):
|
|
"""Retrieve the source diff for a package/campaign.
|
|
|
|
Args:
|
|
source: the source package name
|
|
campaign: the campaign to retrieve
|
|
Returns:
|
|
the diff as a bytestring
|
|
Raises:
|
|
MissingDiffError: If the diff was missing
|
|
(source not valid, campaign not valid, no runs yet, etc)
|
|
"""
|
|
url = f"{api_url}{quote(campaign)}/pkg/{quote(source)}/diff"
|
|
headers = {"User-Agent": USER_AGENT, "Accept": "text/plain"}
|
|
req = Request(url, headers=headers)
|
|
try:
|
|
with urlopen(req) as resp:
|
|
data = resp.read()
|
|
except HTTPError as err:
|
|
if err.code == 404:
|
|
raise MissingDiffError(err.read().decode()) from err
|
|
raise err
|
|
return data
|
|
|
|
|
|
def merge(
|
|
source: str, campaign: str, api_url: str = DEFAULT_API_URL, force: bool = False
|
|
): # pylint: disable=R0915
|
|
"""Merge changes from a campaign.
|
|
|
|
Args:
|
|
source: the source package name
|
|
campaign: applicable campaign
|
|
api_url: API URL
|
|
"""
|
|
url = f"{api_url}{quote(campaign)}/pkg/{quote(source)}"
|
|
try:
|
|
result = _get_json_url(url)
|
|
except HTTPError as err:
|
|
if err.code == 404:
|
|
logging.warning("No runs for %s/%s", source, campaign)
|
|
return 1
|
|
raise
|
|
|
|
if result["result_code"] != "success":
|
|
if force:
|
|
logging.fatal(
|
|
"Last run was not successful: %s; run with --force to merge anyway.",
|
|
result["result_code"],
|
|
)
|
|
return 1
|
|
logging.warning("Last run was not success: %s, merging anyway.")
|
|
|
|
remotes = subprocess.check_output(["git", "remote"], text=True).splitlines(False)
|
|
if "debian-janitor" not in remotes:
|
|
logging.info("Adding debian-janitor remote")
|
|
subprocess.check_call(
|
|
[
|
|
"git",
|
|
"remote",
|
|
"add",
|
|
"debian-janitor",
|
|
f"https://janitor.debian.net/git/{source}",
|
|
]
|
|
)
|
|
else:
|
|
logging.debug("debian-janitor already remote exists")
|
|
|
|
if len(result["branches"]) > 1:
|
|
logging.fatal(
|
|
"Merging changes with multiple branches is currently not supported"
|
|
)
|
|
return 1
|
|
|
|
if len(result["branches"]) < 1:
|
|
logging.fatal("No branches to merge")
|
|
return 1
|
|
|
|
# TODO(jelmer): Fetch tags
|
|
|
|
ret = 0
|
|
for role, _details in result["branches"].items():
|
|
try:
|
|
subprocess.check_call(
|
|
["git", "pull", "debian-janitor", f"{campaign}/{role or 'main'}"]
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
# Git would have already printed an error to stderr
|
|
ret = 1
|
|
|
|
return ret
|
|
|
|
|
|
def review(
|
|
source: str,
|
|
campaign: str,
|
|
verdict: str,
|
|
comment: Optional[str] = None,
|
|
api_url=DEFAULT_API_URL,
|
|
) -> int:
|
|
"""Submit a review of a package.
|
|
|
|
Args:
|
|
source: the source package name
|
|
campaign: applicable campaign
|
|
verdict: a verdict ("approved", "abstained", "rejected", "reschedule")
|
|
comment: optional comment explaining the verdict
|
|
"""
|
|
url = f"{api_url}{quote(campaign)}/pkg/{quote(source)}"
|
|
headers = {"User-Agent": USER_AGENT, "Accept": "text/plain"}
|
|
data = {"review-status": verdict}
|
|
if comment:
|
|
data["review-comment"] = comment
|
|
req = Request(url, headers=headers, method="POST", data=urlencode(data).encode())
|
|
with urlopen(req) as resp:
|
|
resp.read()
|
|
return 0
|
|
|
|
|
|
def status(source: str, campaign: str, api_url: str = DEFAULT_API_URL) -> int:
|
|
"""Print the status for a package.
|
|
|
|
Args:
|
|
source: the source package name
|
|
campaign: applicable campaign
|
|
"""
|
|
url = f"{api_url}{quote(campaign)}/pkg/{quote(source)}"
|
|
try:
|
|
data = _get_json_url(url)
|
|
except HTTPError as err:
|
|
if err.code == 404:
|
|
logging.info("No relevant runs.")
|
|
# TODO(jelmer): print info about next scheduled run and command?
|
|
return 2
|
|
raise
|
|
if data.get("failure"):
|
|
if data["failure"]["transient"]:
|
|
transient = " (transient)"
|
|
else:
|
|
transient = ""
|
|
logging.warning("Failure stage: %s%s", data["failure"]["stage"], transient)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def main(argv): # pylint: disable=R0911,R0912,R0915
|
|
"""Handle command-line arguments."""
|
|
parser = argparse.ArgumentParser("janitor")
|
|
parser.add_argument("--debug", action="store_true")
|
|
parser.add_argument(
|
|
"--api-url", type=str, help="API endpoint to talk to", default=DEFAULT_API_URL
|
|
)
|
|
subparsers = parser.add_subparsers(help="sub-command help", dest="subcommand")
|
|
schedule_parser = subparsers.add_parser("schedule")
|
|
schedule_parser.add_argument("campaign")
|
|
schedule_parser.add_argument("source", help="Source package name", nargs="?")
|
|
diff_parser = subparsers.add_parser("diff")
|
|
diff_parser.add_argument("campaign")
|
|
diff_parser.add_argument("source", help="Source package name", nargs="?")
|
|
merge_parser = subparsers.add_parser("merge")
|
|
merge_parser.add_argument("campaign")
|
|
review_parser = subparsers.add_parser("review")
|
|
review_parser.add_argument("campaign")
|
|
review_parser.add_argument("--source", help="Source package name")
|
|
review_parser.add_argument(
|
|
"verdict",
|
|
help="Verdict",
|
|
choices=["approved", "rejected", "abstained", "reschedule"],
|
|
type=str,
|
|
)
|
|
review_parser.add_argument("comment", help="Comment explaining review", nargs="?")
|
|
status_parser = subparsers.add_parser("status")
|
|
status_parser.add_argument("campaign")
|
|
status_parser.add_argument("source", help="Source package name", nargs="?")
|
|
args = parser.parse_args(argv)
|
|
logging.basicConfig(
|
|
format="%(message)s", level=logging.INFO if not args.debug else logging.DEBUG
|
|
)
|
|
|
|
def _get_local_source() -> str:
|
|
try:
|
|
with open("debian/changelog", "r", encoding="utf-8") as changelog_file:
|
|
changelog = Changelog(changelog_file)
|
|
except FileNotFoundError:
|
|
parser.error("not in Debian package, and no source package name specified")
|
|
logging.info("Using source package: %s", changelog.package)
|
|
return changelog.package
|
|
|
|
if args.subcommand == "schedule":
|
|
if args.source is None:
|
|
args.source = _get_local_source()
|
|
try:
|
|
(est_duration, pos, wait_time) = schedule(
|
|
args.source, args.campaign, api_url=args.api_url
|
|
)
|
|
except NoSuchSource as err:
|
|
logging.fatal("%s", err.args[0])
|
|
return 1
|
|
if pos is not None:
|
|
logging.info(
|
|
"Scheduled. Estimated duration: %.2fs,"
|
|
" queue position: %d (wait time: %.2f)",
|
|
est_duration,
|
|
pos,
|
|
wait_time,
|
|
)
|
|
else:
|
|
logging.info("Scheduled.")
|
|
return 0
|
|
if args.subcommand == "diff":
|
|
if args.source is None:
|
|
args.source = _get_local_source()
|
|
try:
|
|
sys.stdout.buffer.write(
|
|
diff(args.source, args.campaign, api_url=args.api_url)
|
|
)
|
|
sys.stdout.flush()
|
|
except MissingDiffError as err:
|
|
logging.fatal("%s", err.args[0])
|
|
return 1
|
|
return 0
|
|
if args.subcommand == "merge":
|
|
source = _get_local_source()
|
|
return merge(source, args.campaign, api_url=args.api_url)
|
|
if args.subcommand == "review":
|
|
if args.source is None:
|
|
args.source = _get_local_source()
|
|
return review(
|
|
args.source, args.campaign, args.verdict, args.comment, api_url=args.api_url
|
|
)
|
|
if args.subcommand == "status":
|
|
if args.source is None:
|
|
args.source = _get_local_source()
|
|
return status(args.source, args.campaign, api_url=args.api_url)
|
|
parser.print_usage()
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv[1:]))
|