summaryrefslogtreecommitdiffstats
path: root/gita
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-17 09:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-17 09:07:14 +0000
commitfcffaf214966be45ca78fecc6aaa5fae0b42d05a (patch)
tree666b8c46f7fdbb3a78947221170909a0ccafbb41 /gita
parentAdding upstream version 0.16.6.1. (diff)
downloadgita-fcffaf214966be45ca78fecc6aaa5fae0b42d05a.tar.xz
gita-fcffaf214966be45ca78fecc6aaa5fae0b42d05a.zip
Adding upstream version 0.16.7.2.upstream/0.16.7.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'gita')
-rw-r--r--gita/__init__.py19
-rw-r--r--gita/__main__.py77
-rw-r--r--gita/cmds.json11
-rw-r--r--gita/info.py78
-rw-r--r--gita/io.py33
-rw-r--r--gita/utils.py14
6 files changed, 185 insertions, 47 deletions
diff --git a/gita/__init__.py b/gita/__init__.py
index 33c0f41..de4873d 100644
--- a/gita/__init__.py
+++ b/gita/__init__.py
@@ -1,3 +1,18 @@
-import pkg_resources
+import sys
-__version__ = pkg_resources.get_distribution("gita").version
+
+def get_version() -> str:
+ try:
+ import pkg_resources
+ except ImportError:
+ try:
+ from importlib.metadata import version
+ except ImportError:
+ print("cannot determine version", sys.version_info)
+ else:
+ return version("gita")
+ else:
+ return pkg_resources.get_distribution("gita").version
+
+
+__version__ = get_version()
diff --git a/gita/__main__.py b/gita/__main__.py
index b2bc32b..6809f15 100644
--- a/gita/__main__.py
+++ b/gita/__main__.py
@@ -18,14 +18,15 @@ import os
import sys
import csv
import argparse
+import argcomplete
import subprocess
from functools import partial
-import pkg_resources
from itertools import chain
from pathlib import Path
import glob
+from typing import Dict, Optional
-from . import utils, info, common
+from . import utils, info, common, io, get_version
def _group_name(name: str, exclude_old_names=True) -> str:
@@ -146,13 +147,28 @@ def f_info(args: argparse.Namespace):
with open(csv_config, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(to_display)
+ elif cmd == "set-length":
+ csv_config = common.get_config_fname("layout.csv")
+ print(f"Settings are in {csv_config}")
+ defaults = {
+ "branch": 19,
+ "symbols": 5,
+ "branch_name": 27,
+ "commit_msg": 0,
+ "commit_time": 0, # 0 means no limit
+ "path": 30,
+ }
+ with open(csv_config, "w", newline="") as f:
+ writer = csv.DictWriter(f, fieldnames=defaults)
+ writer.writeheader()
+ writer.writerow(defaults)
def f_clone(args: argparse.Namespace):
if args.dry_run:
if args.from_file:
- for url, repo_name, abs_path in utils.parse_clone_config(args.clonee):
+ for url, repo_name, abs_path in io.parse_clone_config(args.clonee):
print(f"git clone {url} {abs_path}")
else:
print(f"git clone {args.clonee}")
@@ -172,28 +188,35 @@ def f_clone(args: argparse.Namespace):
f_add(args)
return
+ # TODO: add repos to group too
+ repos, groups = io.parse_clone_config(args.clonee)
if args.preserve_path:
utils.exec_async_tasks(
- utils.run_async(repo_name, path, ["git", "clone", url, abs_path])
- for url, repo_name, abs_path in utils.parse_clone_config(args.clonee)
+ utils.run_async(repo_name, path, ["git", "clone", r["url"], r["path"]])
+ for repo_name, r in repos.items()
)
else:
utils.exec_async_tasks(
- utils.run_async(repo_name, path, ["git", "clone", url])
- for url, repo_name, _ in utils.parse_clone_config(args.clonee)
+ utils.run_async(repo_name, path, ["git", "clone", r["url"]])
+ for repo_name, r in repos.items()
)
def f_freeze(args):
- repos = utils.get_repos()
+ """
+ print repo and group information for future cloning
+ """
ctx = utils.get_context()
if args.group is None and ctx:
args.group = ctx.stem
+ repos = utils.get_repos()
+ group_name = args.group
group_repos = None
- if args.group: # only display repos in this group
- group_repos = utils.get_groups()[args.group]["repos"]
+ if group_name: # only display repos in this group
+ group_repos = utils.get_groups()[group_name]["repos"]
repos = {k: repos[k] for k in group_repos if k in repos}
seen = {""}
+ # print(repos)
for name, prop in repos.items():
path = prop["path"]
url = ""
@@ -211,7 +234,16 @@ def f_freeze(args):
url = parts[1]
if url not in seen:
seen.add(url)
- print(f"{url},{name},{path}")
+ # TODO: add another field to distinguish regular repo or worktree or submodule
+ print(f"{url},{name},{path},")
+ # group information: these lines don't have URL
+ if group_name:
+ group_path = utils.get_groups()[group_name]["path"]
+ print(f",{group_name},{group_path},{'|'.join(group_repos)}")
+ else: # show all groups
+ for gname, g in utils.get_groups().items():
+ group_repos = "|".join(g["repos"])
+ print(f",{gname},{g['path']},{group_repos}")
def f_ll(args: argparse.Namespace):
@@ -435,8 +467,9 @@ def main(argv=None):
title="sub-commands", help="additional help with sub-command -h"
)
- version = pkg_resources.require("gita")[0].version
- p.add_argument("-v", "--version", action="version", version=f"%(prog)s {version}")
+ p.add_argument(
+ "-v", "--version", action="version", version=f"%(prog)s {get_version()}"
+ )
# bookkeeping sub-commands
p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)")
@@ -598,11 +631,17 @@ def main(argv=None):
info_cmds.add_parser("rm", description="Disable information item.").add_argument(
"info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete"
)
+ info_cmds.add_parser(
+ "set-length",
+ description="Set default column widths for information items. "
+ "The settings are in layout.csv",
+ )
ll_doc = f""" status symbols:
+: staged changes
*: unstaged changes
- _: untracked files/folders
+ ?: untracked files/folders
+ $: stashed changes
branch colors:
{info.Color.white}white{info.Color.end}: local has no remote
@@ -780,17 +819,18 @@ def main(argv=None):
cmds = utils.get_cmds_from_files()
for name, data in cmds.items():
help = data.get("help")
+ repo_help = help
cmd = data["cmd"]
if data.get("allow_all"):
choices = utils.get_choices()
nargs = "*"
- help += " for all repos or"
+ repo_help += " for all repos or"
else:
choices = utils.get_repos().keys() | utils.get_groups().keys()
nargs = "+"
- help += " for the chosen repo(s) or group(s)"
- sp = subparsers.add_parser(name, description=help)
- sp.add_argument("repo", nargs=nargs, choices=choices, help=help)
+ repo_help += " for the chosen repo(s) or group(s)"
+ sp = subparsers.add_parser(name, description=help, help=help)
+ sp.add_argument("repo", nargs=nargs, choices=choices, help=repo_help)
is_shell = bool(data.get("shell"))
sp.add_argument(
"-s",
@@ -805,6 +845,7 @@ def main(argv=None):
cmd = cmd.split()
sp.set_defaults(func=f_git_cmd, cmd=cmd)
+ argcomplete.autocomplete(p)
args = p.parse_args(argv)
args.async_blacklist = {
diff --git a/gita/cmds.json b/gita/cmds.json
index eadda81..c1890c5 100644
--- a/gita/cmds.json
+++ b/gita/cmds.json
@@ -22,8 +22,13 @@
"cmd": "git log -1 HEAD",
"help": "show log information of HEAD"
},
-"log":
- {"cmd": "git log",
+"lo":{
+ "cmd": "git log --oneline -7",
+ "allow_all": true,
+ "help": "show one-line log for the latest 7 commits"
+ },
+"log":{
+ "cmd": "git log",
"disable_async": true,
"help": "show logs"
},
@@ -77,10 +82,12 @@
},
"stat":{
"cmd": "git diff --stat",
+ "allow_all": true,
"help": "show edit statistics"
},
"st":{
"cmd": "git status",
+ "allow_all": true,
"help": "show status"
},
"tag":{
diff --git a/gita/info.py b/gita/info.py
index 10d8bea..0a253a0 100644
--- a/gita/info.py
+++ b/gita/info.py
@@ -9,6 +9,40 @@ from typing import Tuple, List, Callable, Dict
from . import common
+class Truncate:
+ """
+ Reads in user layout.csv file and uses the values there
+ to truncate the string passed in. If the file doesn't
+ exist or the requested field doesn't exist then don't
+ truncate
+ """
+
+ widths = {}
+
+ def __init__(self):
+ csv_config = Path(common.get_config_fname("layout.csv"))
+ if csv_config.is_file():
+ with open(csv_config, "r") as f:
+ reader = csv.DictReader(f)
+ self.widths = next(reader)
+
+ # Ensure the Dict type is Dict[str, int] to reduce casting elsewhere
+ for e, width in self.widths.items():
+ self.widths[e] = int(width)
+
+ def truncate(self, field: str, message: str):
+ # 0 means no width limit applied
+ if not self.widths.get(field):
+ return message
+
+ length = 3 if self.widths[field] < 3 else self.widths[field]
+ return (
+ message[: length - 3] + "..."
+ if len(message) > length
+ else message.ljust(length)
+ )
+
+
class Color(Enum):
"""
Terminal color
@@ -113,8 +147,8 @@ def get_info_items() -> List[str]:
return display_items
-def get_path(prop: Dict[str, str]) -> str:
- return f'{Color.cyan}{prop["path"]}{Color.end}'
+def get_path(prop: Dict[str, str], truncator: Truncate) -> str:
+ return f'{Color.cyan}{truncator.truncate("path", prop["path"])}{Color.end}'
# TODO: do we need to add the flags here too?
@@ -164,7 +198,21 @@ def has_untracked(flags: List[str], path) -> bool:
return bool(result.stdout)
-def get_commit_msg(prop: Dict[str, str]) -> str:
+def has_stashed(flags: List[str], path) -> bool:
+ """
+ Return True if stashed content exists
+ """
+ # FIXME: this doesn't work for repos like worktrees, bare, etc
+ p = Path(path) / ".git" / "logs" / "refs" / "stash"
+ got = False
+ try:
+ got = p.is_file()
+ except Exception:
+ pass
+ return got
+
+
+def get_commit_msg(prop: Dict[str, str], truncator: Truncate) -> str:
"""
Return the last commit message.
"""
@@ -177,10 +225,10 @@ def get_commit_msg(prop: Dict[str, str]) -> str:
universal_newlines=True,
cwd=prop["path"],
)
- return result.stdout.strip()
+ return truncator.truncate("commit_msg", result.stdout.strip())
-def get_commit_time(prop: Dict[str, str]) -> str:
+def get_commit_time(prop: Dict[str, str], truncator: Truncate) -> str:
"""
Return the last commit time in parenthesis.
"""
@@ -192,13 +240,14 @@ def get_commit_time(prop: Dict[str, str]) -> str:
universal_newlines=True,
cwd=prop["path"],
)
- return f"({result.stdout.strip()})"
+ return truncator.truncate("commit_time", f"({result.stdout.strip()})")
default_symbols = {
"dirty": "*",
"staged": "+",
"untracked": "?",
+ "stashed": "$",
"local_ahead": "↑",
"remote_ahead": "↓",
"diverged": "⇕",
@@ -223,11 +272,11 @@ def get_symbols() -> Dict[str, str]:
return default_symbols
-def get_repo_status(prop: Dict[str, str], no_colors=False) -> str:
- branch = get_head(prop["path"])
- dirty, staged, untracked, situ = _get_repo_status(prop)
+def get_repo_status(prop: Dict[str, str], truncator: Truncate, no_colors=False) -> str:
+ branch = truncator.truncate("branch", get_head(prop["path"]))
+ dirty, staged, untracked, stashed, situ = _get_repo_status(prop)
symbols = get_symbols()
- info = f"{branch:<10} [{symbols[dirty]+symbols[staged]+symbols[untracked]+symbols[situ]}]"
+ info = f"{branch:<10} {truncator.truncate('symbols', f'[{symbols[dirty]}{symbols[staged]}{symbols[stashed]}{symbols[untracked]}{symbols[situ]}]')}"
if no_colors:
return f"{info:<18}"
@@ -236,11 +285,11 @@ def get_repo_status(prop: Dict[str, str], no_colors=False) -> str:
return f"{color}{info:<18}{Color.end}"
-def get_repo_branch(prop: Dict[str, str]) -> str:
- return get_head(prop["path"])
+def get_repo_branch(prop: Dict[str, str], truncator: Truncate) -> str:
+ return truncator.truncate("branch_name", get_head(prop["path"]))
-def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]:
+def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str, str]:
"""
Return the status of one repo
"""
@@ -249,6 +298,7 @@ def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]:
dirty = "dirty" if run_quiet_diff(flags, [], path) else ""
staged = "staged" if run_quiet_diff(flags, ["--cached"], path) else ""
untracked = "untracked" if has_untracked(flags, path) else ""
+ stashed = "stashed" if has_stashed(flags, path) else ""
diff_returncode = run_quiet_diff(flags, ["@{u}", "@{0}"], path)
if diff_returncode == 128:
@@ -263,7 +313,7 @@ def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]:
situ = "diverged" if diverged else "remote_ahead"
else: # local is ahead of remote
situ = "local_ahead"
- return dirty, staged, untracked, situ
+ return dirty, staged, untracked, stashed, situ
ALL_INFO_ITEMS = {
diff --git a/gita/io.py b/gita/io.py
new file mode 100644
index 0000000..78665b7
--- /dev/null
+++ b/gita/io.py
@@ -0,0 +1,33 @@
+import os
+import csv
+from typing import Tuple
+
+
+def parse_clone_config(fname: str) -> Tuple:
+ """
+ Return the repo information (url, name, path, type) and group information
+ (, name, path, repos) saved in `fname`.
+ """
+ repos = {}
+ groups = {}
+ if os.path.isfile(fname) and os.stat(fname).st_size > 0:
+ with open(fname) as f:
+ rows = csv.DictReader(
+ f, ["url", "name", "path", "type", "flags"], restval=""
+ ) # it's actually a reader
+ for r in rows:
+ if r["url"]:
+ repos[r["name"]] = {
+ "path": r["path"],
+ "type": r["type"],
+ "flags": r["flags"].split(),
+ "url": r["url"],
+ }
+ else:
+ groups[r["name"]] = {
+ "path": r["path"],
+ "repos": [
+ repo for repo in r["type"].split("|") if repo in repos
+ ],
+ }
+ return repos, groups
diff --git a/gita/utils.py b/gita/utils.py
index 6746d7f..f7717e0 100644
--- a/gita/utils.py
+++ b/gita/utils.py
@@ -7,7 +7,7 @@ import platform
import subprocess
from functools import lru_cache
from pathlib import Path
-from typing import List, Dict, Coroutine, Union, Iterator, Tuple
+from typing import List, Dict, Coroutine, Union, Tuple
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
@@ -367,15 +367,6 @@ def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str,
return new_groups
-def parse_clone_config(fname: str) -> Iterator[List[str]]:
- """
- Return the url, name, and path of all repos in `fname`.
- """
- with open(fname) as f:
- for line in f:
- yield line.strip().split(",")
-
-
async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]:
"""
Run `cmds` asynchronously in `path` directory. Return the `path` if
@@ -430,13 +421,14 @@ def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
Return the status of all repos
"""
if repos:
+ truncator = info.Truncate()
name_width = len(max(repos, key=len)) + 1
funcs = info.get_info_funcs(no_colors=no_colors)
num_threads = min(multiprocessing.cpu_count(), len(repos))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for line in executor.map(
- lambda name: f'{name:<{name_width}}{" ".join(f(repos[name]) for f in funcs)}',
+ lambda name: f'{name:<{name_width}}{" ".join(f(repos[name], truncator) for f in funcs)}',
sorted(repos),
):
yield line