summaryrefslogtreecommitdiffstats
path: root/gita/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'gita/utils.py')
-rw-r--r--gita/utils.py440
1 files changed, 364 insertions, 76 deletions
diff --git a/gita/utils.py b/gita/utils.py
index d14484a..6746d7f 100644
--- a/gita/utils.py
+++ b/gita/utils.py
@@ -1,61 +1,160 @@
+import sys
import os
-import yaml
+import json
+import csv
import asyncio
import platform
+import subprocess
from functools import lru_cache
-from typing import List, Dict, Coroutine, Union
+from pathlib import Path
+from typing import List, Dict, Coroutine, Union, Iterator, Tuple
+from collections import Counter, defaultdict
+from concurrent.futures import ThreadPoolExecutor
+import multiprocessing
from . import info
from . import common
-def get_config_fname(fname: str) -> str:
+MAX_INT = sys.maxsize
+
+
+def get_relative_path(kid: os.PathLike, parent: str) -> Union[List[str], None]:
"""
- Return the file name that stores the repo locations.
+ Return the relative path depth if relative, otherwise None.
+
+ Both the `kid` and `parent` should be absolute paths
"""
- root = common.get_config_dir()
- return os.path.join(root, fname)
+ if parent == "":
+ return None
+
+ p_kid = Path(kid)
+ # p_kid = Path(kid).resolve()
+ try:
+ p_rel = p_kid.relative_to(parent)
+ except ValueError:
+ return None
+ rel = str(p_rel).split(os.sep)
+ if rel == ["."]:
+ rel = []
+ return rel
@lru_cache()
-def get_repos() -> Dict[str, str]:
+def get_repos() -> Dict[str, Dict[str, str]]:
"""
- Return a `dict` of repo name to repo absolute path
+ Return a `dict` of repo name to repo absolute path and repo type
+
"""
- path_file = get_config_fname('repo_path')
+ path_file = common.get_config_fname("repos.csv")
repos = {}
- # Each line is a repo path and repo name separated by ,
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
with open(path_file) as f:
- for line in f:
- line = line.rstrip()
- if not line: # blank line
- continue
- path, name = line.split(',')
- if not is_git(path):
- continue
- if name not in repos:
- repos[name] = path
- else: # repo name collision for different paths: include parent path name
- par_name = os.path.basename(os.path.dirname(path))
- repos[os.path.join(par_name, name)] = path
+ rows = csv.DictReader(
+ f, ["path", "name", "type", "flags"], restval=""
+ ) # it's actually a reader
+ repos = {
+ r["name"]: {
+ "path": r["path"],
+ "type": r["type"],
+ "flags": r["flags"].split(),
+ }
+ for r in rows
+ if is_git(r["path"], include_bare=True)
+ }
return repos
@lru_cache()
-def get_groups() -> Dict[str, List[str]]:
+def get_context() -> Union[Path, None]:
+ """
+ Return context file path, or None if not set. Note that if in auto context
+ mode, the return value is not auto.context but the resolved context,
+ which could be None.
+
+ """
+ config_dir = Path(common.get_config_dir())
+ matches = list(config_dir.glob("*.context"))
+ if len(matches) > 1:
+ print("Cannot have multiple .context file")
+ sys.exit(1)
+ if not matches:
+ return None
+ ctx = matches[0]
+ if ctx.stem == "auto":
+ # The context is set to be the group with minimal distance to cwd
+ candidate = None
+ min_dist = MAX_INT
+ for gname, prop in get_groups().items():
+ rel = get_relative_path(Path.cwd(), prop["path"])
+ if rel is None:
+ continue
+ d = len(rel)
+ if d < min_dist:
+ candidate = gname
+ min_dist = d
+ if not candidate:
+ ctx = None
+ else:
+ ctx = ctx.with_name(f"{candidate}.context")
+ return ctx
+
+
+@lru_cache()
+def get_groups() -> Dict[str, Dict[str, Union[str, List]]]:
"""
- Return a `dict` of group name to repo names.
+ Return a `dict` of group name to group properties such as repo names and
+ group path.
"""
- fname = get_config_fname('groups.yml')
+ fname = common.get_config_fname("groups.csv")
groups = {}
- # Each line is a repo path and repo name separated by ,
+ repos = get_repos()
+ # Each line is: group-name:repo1 repo2 repo3:group-path
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
- with open(fname, 'r') as f:
- groups = yaml.load(f, Loader=yaml.FullLoader)
+ with open(fname, "r") as f:
+ rows = csv.DictReader(
+ f, ["name", "repos", "path"], restval="", delimiter=":"
+ )
+ # filter out invalid repos
+ groups = {
+ r["name"]: {
+ "repos": [repo for repo in r["repos"].split() if repo in repos],
+ "path": r["path"],
+ }
+ for r in rows
+ }
return groups
+def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool:
+ """
+ Delete repo from groups
+ """
+ deleted = False
+ for name in groups:
+ try:
+ groups[name]["repos"].remove(repo)
+ except ValueError as e:
+ pass
+ else:
+ deleted = True
+ return deleted
+
+
+def replace_context(old: Union[Path, None], new: str):
+ """ """
+ auto = Path(common.get_config_dir()) / "auto.context"
+ if auto.exists():
+ old = auto
+
+ if new == "none": # delete
+ old and old.unlink()
+ elif old:
+ # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
+ old.rename(old.with_name(f"{new}.context"))
+ else:
+ Path(auto.with_name(f"{new}.context")).write_text("")
+
def get_choices() -> List[Union[str, None]]:
"""
@@ -72,67 +171,209 @@ def get_choices() -> List[Union[str, None]]:
return choices
-def is_git(path: str) -> bool:
+def is_submodule_repo(p: Path) -> bool:
+ """ """
+ if p.is_file() and ".git/modules" in p.read_text():
+ return True
+ return False
+
+
+def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool:
"""
Return True if the path is a git repo.
"""
+ if not os.path.exists(path):
+ return False
# An alternative is to call `git rev-parse --is-inside-work-tree`
# I don't see why that one is better yet.
- # For a regular git repo, .git is a folder, for a worktree repo, .git is a file.
- # However, git submodule repo also has .git as a file.
+ # For a regular git repo, .git is a folder. For a worktree repo and
+ # submodule repo, .git is a file.
# A more reliable way to differentiable regular and worktree repos is to
# compare the result of `git rev-parse --git-dir` and
# `git rev-parse --git-common-dir`
- loc = os.path.join(path, '.git')
+ loc = os.path.join(path, ".git")
# TODO: we can display the worktree repos in a different font.
- return os.path.exists(loc)
-
-
-def rename_repo(repos: Dict[str, str], repo: str, new_name: str):
+ if os.path.exists(loc):
+ if exclude_submodule and is_submodule_repo(Path(loc)):
+ return False
+ return True
+ if not include_bare:
+ return False
+ # detect bare repo
+ got = subprocess.run(
+ "git rev-parse --is-bare-repository".split(),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ cwd=path,
+ )
+ if got.returncode == 0 and got.stdout == b"true\n":
+ return True
+ return False
+
+
+def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
"""
Write new repo name to file
"""
- path = repos[repo]
+ if new_name in repos:
+ print(f"{new_name} is already in use!")
+ return
+ prop = repos[repo]
del repos[repo]
- repos[new_name] = path
- write_to_repo_file(repos, 'w')
+ repos[new_name] = prop
+ write_to_repo_file(repos, "w")
+ groups = get_groups()
+ for g, values in groups.items():
+ members = values["repos"]
+ if repo in members:
+ members.remove(repo)
+ members.append(new_name)
+ groups[g]["repos"] = sorted(members)
+ write_to_groups_file(groups, "w")
-def write_to_repo_file(repos: Dict[str, str], mode: str):
+
+def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str):
"""
+ @param repos: each repo is {name: {properties}}
"""
- data = ''.join(f'{path},{name}\n' for name, path in repos.items())
- fname = get_config_fname('repo_path')
+ # The 3rd column is repo type; unused field
+ data = [
+ (prop["path"], name, "", " ".join(prop["flags"]))
+ for name, prop in repos.items()
+ ]
+ fname = common.get_config_fname("repos.csv")
os.makedirs(os.path.dirname(fname), exist_ok=True)
- with open(fname, mode) as f:
- f.write(data)
+ with open(fname, mode, newline="") as f:
+ writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ writer.writerows(data)
-def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
+# TODO: combine with the repo writer
+def write_to_groups_file(groups: Dict[str, Dict], mode: str):
+ """ """
+ fname = common.get_config_fname("groups.csv")
+ os.makedirs(os.path.dirname(fname), exist_ok=True)
+ if not groups: # all groups are deleted
+ Path(fname).write_text("")
+ else:
+ # delete the group if there are no repos
+ for name in list(groups):
+ if not groups[name]["repos"]:
+ del groups[name]
+ with open(fname, mode, newline="") as f:
+ data = [
+ (group, " ".join(prop["repos"]), prop["path"])
+ for group, prop in groups.items()
+ ]
+ writer = csv.writer(
+ f, delimiter=":", quotechar='"', quoting=csv.QUOTE_MINIMAL
+ )
+ writer.writerows(data)
+
+
+def _make_name(
+ path: str, repos: Dict[str, Dict[str, str]], name_counts: Counter
+) -> str:
"""
+ Given a new repo `path`, create a repo name. By default, basename is used.
+ If name collision exists, further include parent path name.
+ @param path: It should not be in `repos` and is absolute
"""
- fname = get_config_fname('groups.yml')
- os.makedirs(os.path.dirname(fname), exist_ok=True)
- with open(fname, mode) as f:
- yaml.dump(groups, f, default_flow_style=None)
-
-
-def add_repos(repos: Dict[str, str], new_paths: List[str]):
+ name = os.path.basename(os.path.normpath(path))
+ if name in repos or name_counts[name] > 1:
+ # path has no trailing /
+ par_name = os.path.basename(os.path.dirname(path))
+ return os.path.join(par_name, name)
+ return name
+
+
+def add_repos(
+ repos: Dict[str, Dict[str, str]],
+ new_paths: List[str],
+ include_bare=False,
+ exclude_submodule=False,
+ dry_run=False,
+) -> Dict[str, Dict[str, str]]:
"""
- Write new repo paths to file
+ Write new repo paths to file; return the added repos.
+
+ @param repos: name -> path
"""
- existing_paths = set(repos.values())
- new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p))
+ existing_paths = {prop["path"] for prop in repos.values()}
+ new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)}
new_paths = new_paths - existing_paths
+ new_repos = {}
if new_paths:
print(f"Found {len(new_paths)} new repo(s).")
+ if dry_run:
+ for p in new_paths:
+ print(p)
+ return {}
+ name_counts = Counter(os.path.basename(os.path.normpath(p)) for p in new_paths)
new_repos = {
- os.path.basename(os.path.normpath(path)): path
- for path in new_paths}
- write_to_repo_file(new_repos, 'a+')
+ _make_name(path, repos, name_counts): {
+ "path": path,
+ "flags": "",
+ }
+ for path in new_paths
+ }
+ write_to_repo_file(new_repos, "a+")
+ else:
+ print("No new repos found!")
+ return new_repos
+
+
+def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[Tuple[str, ...], str]:
+ """
+ Return relative parent strings, and the parent head string
+
+ For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
+ then return (b, c, d)
+ """
+ for p in paths:
+ rel = get_relative_path(repo_path, p)[:-1]
+ if rel is not None:
+ break
else:
- print('No new repos found!')
+ return (), ""
+ head, tail = os.path.split(p)
+ return (tail, *rel), head
+
+
+def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, Dict]:
+ """
+
+ @params repos: repos to be grouped
+ """
+ # FIXME: the upstream code should make sure that paths are all independent
+ # i.e., each repo should be contained in one and only one path
+ new_groups = defaultdict(dict)
+ for repo_name, prop in repos.items():
+ hash, head = _generate_dir_hash(prop["path"], paths)
+ if not hash:
+ continue
+ for i in range(1, len(hash) + 1):
+ group_name = "-".join(hash[:i])
+ prop = new_groups[group_name]
+ prop["path"] = os.path.join(head, *hash[:i])
+ if "repos" not in prop:
+ prop["repos"] = [repo_name]
+ else:
+ prop["repos"].append(repo_name)
+ # FIXME: need to make sure the new group names don't clash with old ones
+ # or repo names
+ return new_groups
+
+
+def parse_clone_config(fname: str) -> Iterator[List[str]]:
+ """
+ Return the url, name, and path of all repos in `fname`.
+ """
+ with open(fname) as f:
+ for line in f:
+ yield line.strip().split(",")
async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]:
@@ -140,17 +381,19 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s
Run `cmds` asynchronously in `path` directory. Return the `path` if
execution fails.
"""
+ # TODO: deprecated since 3.8, will be removed in 3.10
process = await asyncio.create_subprocess_exec(
*cmds,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
start_new_session=True,
- cwd=path)
+ cwd=path,
+ )
stdout, stderr = await process.communicate()
for pipe in (stdout, stderr):
if pipe:
- print(format_output(pipe.decode(), f'{repo_name}: '))
+ print(format_output(pipe.decode(), repo_name))
# The existence of stderr is not good indicator since git sometimes write
# to stderr even if the execution is successful, e.g. git fetch
if process.returncode != 0:
@@ -161,7 +404,7 @@ def format_output(s: str, prefix: str):
"""
Prepends every line in given string with the given prefix.
"""
- return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)])
+ return "".join([f"{prefix}: {line}" for line in s.splitlines(keepends=True)])
def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
@@ -169,7 +412,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
Execute tasks asynchronously
"""
# TODO: asyncio API is nicer in python 3.7
- if platform.system() == 'Windows':
+ if platform.system() == "Windows":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
@@ -182,17 +425,21 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
return errors
-def describe(repos: Dict[str, str]) -> str:
+def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
"""
Return the status of all repos
"""
if repos:
- name_width = max(len(n) for n in repos) + 1
- funcs = info.get_info_funcs()
- for name in sorted(repos):
- path = repos[name]
- display_items = ' '.join(f(path) for f in funcs)
- yield f'{name:<{name_width}}{display_items}'
+ name_width = len(max(repos, key=len)) + 1
+ funcs = info.get_info_funcs(no_colors=no_colors)
+
+ num_threads = min(multiprocessing.cpu_count(), len(repos))
+ with ThreadPoolExecutor(max_workers=num_threads) as executor:
+ for line in executor.map(
+ lambda name: f'{name:<{name_width}}{" ".join(f(repos[name]) for f in funcs)}',
+ sorted(repos),
+ ):
+ yield line
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
@@ -208,18 +455,59 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
}
"""
# default config file
- fname = os.path.join(os.path.dirname(__file__), "cmds.yml")
- with open(fname, 'r') as stream:
- cmds = yaml.load(stream, Loader=yaml.FullLoader)
+ fname = os.path.join(os.path.dirname(__file__), "cmds.json")
+ with open(fname, "r") as f:
+ cmds = json.load(f)
# custom config file
root = common.get_config_dir()
- fname = os.path.join(root, 'cmds.yml')
+ fname = os.path.join(root, "cmds.json")
custom_cmds = {}
if os.path.isfile(fname) and os.path.getsize(fname):
- with open(fname, 'r') as stream:
- custom_cmds = yaml.load(stream, Loader=yaml.FullLoader)
+ with open(fname, "r") as f:
+ custom_cmds = json.load(f)
# custom commands shadow default ones
cmds.update(custom_cmds)
return cmds
+
+
+def parse_repos_and_rest(
+ input: List[str],
+ quote_mode=False,
+) -> Tuple[Dict[str, Dict[str, str]], List[str]]:
+ """
+ Parse gita input arguments
+
+ @return: repos and the rest (e.g., gita shell and super commands)
+ """
+ i = 0
+ names = []
+ repos = get_repos()
+ groups = get_groups()
+ ctx = get_context()
+ for i, word in enumerate(input):
+ if word in repos or word in groups:
+ names.append(word)
+ else:
+ break
+ else: # all input is repos and groups, shift the index once more
+ if i is not None:
+ i += 1
+ if not names and ctx:
+ names = [ctx.stem]
+ if quote_mode and i + 1 != len(input):
+ print(input[i], "is not a repo or group")
+ sys.exit(2)
+
+ if names:
+ chosen = {}
+ for k in names:
+ if k in repos:
+ chosen[k] = repos[k]
+ if k in groups:
+ for r in groups[k]["repos"]:
+ chosen[r] = repos[r]
+ # if not set here, all repos are chosen
+ repos = chosen
+ return repos, input[i:]