diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-17 09:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-17 09:07:14 +0000 |
commit | fcffaf214966be45ca78fecc6aaa5fae0b42d05a (patch) | |
tree | 666b8c46f7fdbb3a78947221170909a0ccafbb41 /gita | |
parent | Adding upstream version 0.16.6.1. (diff) | |
download | gita-fcffaf214966be45ca78fecc6aaa5fae0b42d05a.tar.xz gita-fcffaf214966be45ca78fecc6aaa5fae0b42d05a.zip |
Adding upstream version 0.16.7.2.upstream/0.16.7.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'gita')
-rw-r--r-- | gita/__init__.py | 19 | ||||
-rw-r--r-- | gita/__main__.py | 77 | ||||
-rw-r--r-- | gita/cmds.json | 11 | ||||
-rw-r--r-- | gita/info.py | 78 | ||||
-rw-r--r-- | gita/io.py | 33 | ||||
-rw-r--r-- | gita/utils.py | 14 |
6 files changed, 185 insertions, 47 deletions
diff --git a/gita/__init__.py b/gita/__init__.py index 33c0f41..de4873d 100644 --- a/gita/__init__.py +++ b/gita/__init__.py @@ -1,3 +1,18 @@ -import pkg_resources +import sys -__version__ = pkg_resources.get_distribution("gita").version + +def get_version() -> str: + try: + import pkg_resources + except ImportError: + try: + from importlib.metadata import version + except ImportError: + print("cannot determine version", sys.version_info) + else: + return version("gita") + else: + return pkg_resources.get_distribution("gita").version + + +__version__ = get_version() diff --git a/gita/__main__.py b/gita/__main__.py index b2bc32b..6809f15 100644 --- a/gita/__main__.py +++ b/gita/__main__.py @@ -18,14 +18,15 @@ import os import sys import csv import argparse +import argcomplete import subprocess from functools import partial -import pkg_resources from itertools import chain from pathlib import Path import glob +from typing import Dict, Optional -from . import utils, info, common +from . import utils, info, common, io, get_version def _group_name(name: str, exclude_old_names=True) -> str: @@ -146,13 +147,28 @@ def f_info(args: argparse.Namespace): with open(csv_config, "w", newline="") as f: writer = csv.writer(f) writer.writerow(to_display) + elif cmd == "set-length": + csv_config = common.get_config_fname("layout.csv") + print(f"Settings are in {csv_config}") + defaults = { + "branch": 19, + "symbols": 5, + "branch_name": 27, + "commit_msg": 0, + "commit_time": 0, # 0 means no limit + "path": 30, + } + with open(csv_config, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=defaults) + writer.writeheader() + writer.writerow(defaults) def f_clone(args: argparse.Namespace): if args.dry_run: if args.from_file: - for url, repo_name, abs_path in utils.parse_clone_config(args.clonee): + for url, repo_name, abs_path in io.parse_clone_config(args.clonee): print(f"git clone {url} {abs_path}") else: print(f"git clone {args.clonee}") @@ -172,28 +188,35 @@ def f_clone(args: argparse.Namespace): f_add(args) return + # TODO: add repos to group too + repos, groups = io.parse_clone_config(args.clonee) if args.preserve_path: utils.exec_async_tasks( - utils.run_async(repo_name, path, ["git", "clone", url, abs_path]) - for url, repo_name, abs_path in utils.parse_clone_config(args.clonee) + utils.run_async(repo_name, path, ["git", "clone", r["url"], r["path"]]) + for repo_name, r in repos.items() ) else: utils.exec_async_tasks( - utils.run_async(repo_name, path, ["git", "clone", url]) - for url, repo_name, _ in utils.parse_clone_config(args.clonee) + utils.run_async(repo_name, path, ["git", "clone", r["url"]]) + for repo_name, r in repos.items() ) def f_freeze(args): - repos = utils.get_repos() + """ + print repo and group information for future cloning + """ ctx = utils.get_context() if args.group is None and ctx: args.group = ctx.stem + repos = utils.get_repos() + group_name = args.group group_repos = None - if args.group: # only display repos in this group - group_repos = utils.get_groups()[args.group]["repos"] + if group_name: # only display repos in this group + group_repos = utils.get_groups()[group_name]["repos"] repos = {k: repos[k] for k in group_repos if k in repos} seen = {""} + # print(repos) for name, prop in repos.items(): path = prop["path"] url = "" @@ -211,7 +234,16 @@ def f_freeze(args): url = parts[1] if url not in seen: seen.add(url) - print(f"{url},{name},{path}") + # TODO: add another field to distinguish regular repo or worktree or submodule + print(f"{url},{name},{path},") + # group information: these lines don't have URL + if group_name: + group_path = utils.get_groups()[group_name]["path"] + print(f",{group_name},{group_path},{'|'.join(group_repos)}") + else: # show all groups + for gname, g in utils.get_groups().items(): + group_repos = "|".join(g["repos"]) + print(f",{gname},{g['path']},{group_repos}") def f_ll(args: argparse.Namespace): @@ -435,8 +467,9 @@ def main(argv=None): title="sub-commands", help="additional help with sub-command -h" ) - version = pkg_resources.require("gita")[0].version - p.add_argument("-v", "--version", action="version", version=f"%(prog)s {version}") + p.add_argument( + "-v", "--version", action="version", version=f"%(prog)s {get_version()}" + ) # bookkeeping sub-commands p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)") @@ -598,11 +631,17 @@ def main(argv=None): info_cmds.add_parser("rm", description="Disable information item.").add_argument( "info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete" ) + info_cmds.add_parser( + "set-length", + description="Set default column widths for information items. " + "The settings are in layout.csv", + ) ll_doc = f""" status symbols: +: staged changes *: unstaged changes - _: untracked files/folders + ?: untracked files/folders + $: stashed changes branch colors: {info.Color.white}white{info.Color.end}: local has no remote @@ -780,17 +819,18 @@ def main(argv=None): cmds = utils.get_cmds_from_files() for name, data in cmds.items(): help = data.get("help") + repo_help = help cmd = data["cmd"] if data.get("allow_all"): choices = utils.get_choices() nargs = "*" - help += " for all repos or" + repo_help += " for all repos or" else: choices = utils.get_repos().keys() | utils.get_groups().keys() nargs = "+" - help += " for the chosen repo(s) or group(s)" - sp = subparsers.add_parser(name, description=help) - sp.add_argument("repo", nargs=nargs, choices=choices, help=help) + repo_help += " for the chosen repo(s) or group(s)" + sp = subparsers.add_parser(name, description=help, help=help) + sp.add_argument("repo", nargs=nargs, choices=choices, help=repo_help) is_shell = bool(data.get("shell")) sp.add_argument( "-s", @@ -805,6 +845,7 @@ def main(argv=None): cmd = cmd.split() sp.set_defaults(func=f_git_cmd, cmd=cmd) + argcomplete.autocomplete(p) args = p.parse_args(argv) args.async_blacklist = { diff --git a/gita/cmds.json b/gita/cmds.json index eadda81..c1890c5 100644 --- a/gita/cmds.json +++ b/gita/cmds.json @@ -22,8 +22,13 @@ "cmd": "git log -1 HEAD", "help": "show log information of HEAD" }, -"log": - {"cmd": "git log", +"lo":{ + "cmd": "git log --oneline -7", + "allow_all": true, + "help": "show one-line log for the latest 7 commits" + }, +"log":{ + "cmd": "git log", "disable_async": true, "help": "show logs" }, @@ -77,10 +82,12 @@ }, "stat":{ "cmd": "git diff --stat", + "allow_all": true, "help": "show edit statistics" }, "st":{ "cmd": "git status", + "allow_all": true, "help": "show status" }, "tag":{ diff --git a/gita/info.py b/gita/info.py index 10d8bea..0a253a0 100644 --- a/gita/info.py +++ b/gita/info.py @@ -9,6 +9,40 @@ from typing import Tuple, List, Callable, Dict from . import common +class Truncate: + """ + Reads in user layout.csv file and uses the values there + to truncate the string passed in. If the file doesn't + exist or the requested field doesn't exist then don't + truncate + """ + + widths = {} + + def __init__(self): + csv_config = Path(common.get_config_fname("layout.csv")) + if csv_config.is_file(): + with open(csv_config, "r") as f: + reader = csv.DictReader(f) + self.widths = next(reader) + + # Ensure the Dict type is Dict[str, int] to reduce casting elsewhere + for e, width in self.widths.items(): + self.widths[e] = int(width) + + def truncate(self, field: str, message: str): + # 0 means no width limit applied + if not self.widths.get(field): + return message + + length = 3 if self.widths[field] < 3 else self.widths[field] + return ( + message[: length - 3] + "..." + if len(message) > length + else message.ljust(length) + ) + + class Color(Enum): """ Terminal color @@ -113,8 +147,8 @@ def get_info_items() -> List[str]: return display_items -def get_path(prop: Dict[str, str]) -> str: - return f'{Color.cyan}{prop["path"]}{Color.end}' +def get_path(prop: Dict[str, str], truncator: Truncate) -> str: + return f'{Color.cyan}{truncator.truncate("path", prop["path"])}{Color.end}' # TODO: do we need to add the flags here too? @@ -164,7 +198,21 @@ def has_untracked(flags: List[str], path) -> bool: return bool(result.stdout) -def get_commit_msg(prop: Dict[str, str]) -> str: +def has_stashed(flags: List[str], path) -> bool: + """ + Return True if stashed content exists + """ + # FIXME: this doesn't work for repos like worktrees, bare, etc + p = Path(path) / ".git" / "logs" / "refs" / "stash" + got = False + try: + got = p.is_file() + except Exception: + pass + return got + + +def get_commit_msg(prop: Dict[str, str], truncator: Truncate) -> str: """ Return the last commit message. """ @@ -177,10 +225,10 @@ def get_commit_msg(prop: Dict[str, str]) -> str: universal_newlines=True, cwd=prop["path"], ) - return result.stdout.strip() + return truncator.truncate("commit_msg", result.stdout.strip()) -def get_commit_time(prop: Dict[str, str]) -> str: +def get_commit_time(prop: Dict[str, str], truncator: Truncate) -> str: """ Return the last commit time in parenthesis. """ @@ -192,13 +240,14 @@ def get_commit_time(prop: Dict[str, str]) -> str: universal_newlines=True, cwd=prop["path"], ) - return f"({result.stdout.strip()})" + return truncator.truncate("commit_time", f"({result.stdout.strip()})") default_symbols = { "dirty": "*", "staged": "+", "untracked": "?", + "stashed": "$", "local_ahead": "↑", "remote_ahead": "↓", "diverged": "⇕", @@ -223,11 +272,11 @@ def get_symbols() -> Dict[str, str]: return default_symbols -def get_repo_status(prop: Dict[str, str], no_colors=False) -> str: - branch = get_head(prop["path"]) - dirty, staged, untracked, situ = _get_repo_status(prop) +def get_repo_status(prop: Dict[str, str], truncator: Truncate, no_colors=False) -> str: + branch = truncator.truncate("branch", get_head(prop["path"])) + dirty, staged, untracked, stashed, situ = _get_repo_status(prop) symbols = get_symbols() - info = f"{branch:<10} [{symbols[dirty]+symbols[staged]+symbols[untracked]+symbols[situ]}]" + info = f"{branch:<10} {truncator.truncate('symbols', f'[{symbols[dirty]}{symbols[staged]}{symbols[stashed]}{symbols[untracked]}{symbols[situ]}]')}" if no_colors: return f"{info:<18}" @@ -236,11 +285,11 @@ def get_repo_status(prop: Dict[str, str], no_colors=False) -> str: return f"{color}{info:<18}{Color.end}" -def get_repo_branch(prop: Dict[str, str]) -> str: - return get_head(prop["path"]) +def get_repo_branch(prop: Dict[str, str], truncator: Truncate) -> str: + return truncator.truncate("branch_name", get_head(prop["path"])) -def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]: +def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str, str]: """ Return the status of one repo """ @@ -249,6 +298,7 @@ def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]: dirty = "dirty" if run_quiet_diff(flags, [], path) else "" staged = "staged" if run_quiet_diff(flags, ["--cached"], path) else "" untracked = "untracked" if has_untracked(flags, path) else "" + stashed = "stashed" if has_stashed(flags, path) else "" diff_returncode = run_quiet_diff(flags, ["@{u}", "@{0}"], path) if diff_returncode == 128: @@ -263,7 +313,7 @@ def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]: situ = "diverged" if diverged else "remote_ahead" else: # local is ahead of remote situ = "local_ahead" - return dirty, staged, untracked, situ + return dirty, staged, untracked, stashed, situ ALL_INFO_ITEMS = { diff --git a/gita/io.py b/gita/io.py new file mode 100644 index 0000000..78665b7 --- /dev/null +++ b/gita/io.py @@ -0,0 +1,33 @@ +import os +import csv +from typing import Tuple + + +def parse_clone_config(fname: str) -> Tuple: + """ + Return the repo information (url, name, path, type) and group information + (, name, path, repos) saved in `fname`. + """ + repos = {} + groups = {} + if os.path.isfile(fname) and os.stat(fname).st_size > 0: + with open(fname) as f: + rows = csv.DictReader( + f, ["url", "name", "path", "type", "flags"], restval="" + ) # it's actually a reader + for r in rows: + if r["url"]: + repos[r["name"]] = { + "path": r["path"], + "type": r["type"], + "flags": r["flags"].split(), + "url": r["url"], + } + else: + groups[r["name"]] = { + "path": r["path"], + "repos": [ + repo for repo in r["type"].split("|") if repo in repos + ], + } + return repos, groups diff --git a/gita/utils.py b/gita/utils.py index 6746d7f..f7717e0 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -7,7 +7,7 @@ import platform import subprocess from functools import lru_cache from pathlib import Path -from typing import List, Dict, Coroutine, Union, Iterator, Tuple +from typing import List, Dict, Coroutine, Union, Tuple from collections import Counter, defaultdict from concurrent.futures import ThreadPoolExecutor import multiprocessing @@ -367,15 +367,6 @@ def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, return new_groups -def parse_clone_config(fname: str) -> Iterator[List[str]]: - """ - Return the url, name, and path of all repos in `fname`. - """ - with open(fname) as f: - for line in f: - yield line.strip().split(",") - - async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]: """ Run `cmds` asynchronously in `path` directory. Return the `path` if @@ -430,13 +421,14 @@ def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str: Return the status of all repos """ if repos: + truncator = info.Truncate() name_width = len(max(repos, key=len)) + 1 funcs = info.get_info_funcs(no_colors=no_colors) num_threads = min(multiprocessing.cpu_count(), len(repos)) with ThreadPoolExecutor(max_workers=num_threads) as executor: for line in executor.map( - lambda name: f'{name:<{name_width}}{" ".join(f(repos[name]) for f in funcs)}', + lambda name: f'{name:<{name_width}}{" ".join(f(repos[name], truncator) for f in funcs)}', sorted(repos), ): yield line |