diff options
Diffstat (limited to 'gita/utils.py')
-rw-r--r-- | gita/utils.py | 440 |
1 files changed, 364 insertions, 76 deletions
diff --git a/gita/utils.py b/gita/utils.py index d14484a..6746d7f 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -1,61 +1,160 @@ +import sys import os -import yaml +import json +import csv import asyncio import platform +import subprocess from functools import lru_cache -from typing import List, Dict, Coroutine, Union +from pathlib import Path +from typing import List, Dict, Coroutine, Union, Iterator, Tuple +from collections import Counter, defaultdict +from concurrent.futures import ThreadPoolExecutor +import multiprocessing from . import info from . import common -def get_config_fname(fname: str) -> str: +MAX_INT = sys.maxsize + + +def get_relative_path(kid: os.PathLike, parent: str) -> Union[List[str], None]: """ - Return the file name that stores the repo locations. + Return the relative path depth if relative, otherwise None. + + Both the `kid` and `parent` should be absolute paths """ - root = common.get_config_dir() - return os.path.join(root, fname) + if parent == "": + return None + + p_kid = Path(kid) + # p_kid = Path(kid).resolve() + try: + p_rel = p_kid.relative_to(parent) + except ValueError: + return None + rel = str(p_rel).split(os.sep) + if rel == ["."]: + rel = [] + return rel @lru_cache() -def get_repos() -> Dict[str, str]: +def get_repos() -> Dict[str, Dict[str, str]]: """ - Return a `dict` of repo name to repo absolute path + Return a `dict` of repo name to repo absolute path and repo type + """ - path_file = get_config_fname('repo_path') + path_file = common.get_config_fname("repos.csv") repos = {} - # Each line is a repo path and repo name separated by , if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: with open(path_file) as f: - for line in f: - line = line.rstrip() - if not line: # blank line - continue - path, name = line.split(',') - if not is_git(path): - continue - if name not in repos: - repos[name] = path - else: # repo name collision for different paths: include parent path name - par_name = os.path.basename(os.path.dirname(path)) - repos[os.path.join(par_name, name)] = path + rows = csv.DictReader( + f, ["path", "name", "type", "flags"], restval="" + ) # it's actually a reader + repos = { + r["name"]: { + "path": r["path"], + "type": r["type"], + "flags": r["flags"].split(), + } + for r in rows + if is_git(r["path"], include_bare=True) + } return repos @lru_cache() -def get_groups() -> Dict[str, List[str]]: +def get_context() -> Union[Path, None]: + """ + Return context file path, or None if not set. Note that if in auto context + mode, the return value is not auto.context but the resolved context, + which could be None. + + """ + config_dir = Path(common.get_config_dir()) + matches = list(config_dir.glob("*.context")) + if len(matches) > 1: + print("Cannot have multiple .context file") + sys.exit(1) + if not matches: + return None + ctx = matches[0] + if ctx.stem == "auto": + # The context is set to be the group with minimal distance to cwd + candidate = None + min_dist = MAX_INT + for gname, prop in get_groups().items(): + rel = get_relative_path(Path.cwd(), prop["path"]) + if rel is None: + continue + d = len(rel) + if d < min_dist: + candidate = gname + min_dist = d + if not candidate: + ctx = None + else: + ctx = ctx.with_name(f"{candidate}.context") + return ctx + + +@lru_cache() +def get_groups() -> Dict[str, Dict[str, Union[str, List]]]: """ - Return a `dict` of group name to repo names. + Return a `dict` of group name to group properties such as repo names and + group path. """ - fname = get_config_fname('groups.yml') + fname = common.get_config_fname("groups.csv") groups = {} - # Each line is a repo path and repo name separated by , + repos = get_repos() + # Each line is: group-name:repo1 repo2 repo3:group-path if os.path.isfile(fname) and os.stat(fname).st_size > 0: - with open(fname, 'r') as f: - groups = yaml.load(f, Loader=yaml.FullLoader) + with open(fname, "r") as f: + rows = csv.DictReader( + f, ["name", "repos", "path"], restval="", delimiter=":" + ) + # filter out invalid repos + groups = { + r["name"]: { + "repos": [repo for repo in r["repos"].split() if repo in repos], + "path": r["path"], + } + for r in rows + } return groups +def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool: + """ + Delete repo from groups + """ + deleted = False + for name in groups: + try: + groups[name]["repos"].remove(repo) + except ValueError as e: + pass + else: + deleted = True + return deleted + + +def replace_context(old: Union[Path, None], new: str): + """ """ + auto = Path(common.get_config_dir()) / "auto.context" + if auto.exists(): + old = auto + + if new == "none": # delete + old and old.unlink() + elif old: + # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9 + old.rename(old.with_name(f"{new}.context")) + else: + Path(auto.with_name(f"{new}.context")).write_text("") + def get_choices() -> List[Union[str, None]]: """ @@ -72,67 +171,209 @@ def get_choices() -> List[Union[str, None]]: return choices -def is_git(path: str) -> bool: +def is_submodule_repo(p: Path) -> bool: + """ """ + if p.is_file() and ".git/modules" in p.read_text(): + return True + return False + + +def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool: """ Return True if the path is a git repo. """ + if not os.path.exists(path): + return False # An alternative is to call `git rev-parse --is-inside-work-tree` # I don't see why that one is better yet. - # For a regular git repo, .git is a folder, for a worktree repo, .git is a file. - # However, git submodule repo also has .git as a file. + # For a regular git repo, .git is a folder. For a worktree repo and + # submodule repo, .git is a file. # A more reliable way to differentiable regular and worktree repos is to # compare the result of `git rev-parse --git-dir` and # `git rev-parse --git-common-dir` - loc = os.path.join(path, '.git') + loc = os.path.join(path, ".git") # TODO: we can display the worktree repos in a different font. - return os.path.exists(loc) - - -def rename_repo(repos: Dict[str, str], repo: str, new_name: str): + if os.path.exists(loc): + if exclude_submodule and is_submodule_repo(Path(loc)): + return False + return True + if not include_bare: + return False + # detect bare repo + got = subprocess.run( + "git rev-parse --is-bare-repository".split(), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + cwd=path, + ) + if got.returncode == 0 and got.stdout == b"true\n": + return True + return False + + +def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str): """ Write new repo name to file """ - path = repos[repo] + if new_name in repos: + print(f"{new_name} is already in use!") + return + prop = repos[repo] del repos[repo] - repos[new_name] = path - write_to_repo_file(repos, 'w') + repos[new_name] = prop + write_to_repo_file(repos, "w") + groups = get_groups() + for g, values in groups.items(): + members = values["repos"] + if repo in members: + members.remove(repo) + members.append(new_name) + groups[g]["repos"] = sorted(members) + write_to_groups_file(groups, "w") -def write_to_repo_file(repos: Dict[str, str], mode: str): + +def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str): """ + @param repos: each repo is {name: {properties}} """ - data = ''.join(f'{path},{name}\n' for name, path in repos.items()) - fname = get_config_fname('repo_path') + # The 3rd column is repo type; unused field + data = [ + (prop["path"], name, "", " ".join(prop["flags"])) + for name, prop in repos.items() + ] + fname = common.get_config_fname("repos.csv") os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - f.write(data) + with open(fname, mode, newline="") as f: + writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerows(data) -def write_to_groups_file(groups: Dict[str, List[str]], mode: str): +# TODO: combine with the repo writer +def write_to_groups_file(groups: Dict[str, Dict], mode: str): + """ """ + fname = common.get_config_fname("groups.csv") + os.makedirs(os.path.dirname(fname), exist_ok=True) + if not groups: # all groups are deleted + Path(fname).write_text("") + else: + # delete the group if there are no repos + for name in list(groups): + if not groups[name]["repos"]: + del groups[name] + with open(fname, mode, newline="") as f: + data = [ + (group, " ".join(prop["repos"]), prop["path"]) + for group, prop in groups.items() + ] + writer = csv.writer( + f, delimiter=":", quotechar='"', quoting=csv.QUOTE_MINIMAL + ) + writer.writerows(data) + + +def _make_name( + path: str, repos: Dict[str, Dict[str, str]], name_counts: Counter +) -> str: """ + Given a new repo `path`, create a repo name. By default, basename is used. + If name collision exists, further include parent path name. + @param path: It should not be in `repos` and is absolute """ - fname = get_config_fname('groups.yml') - os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - yaml.dump(groups, f, default_flow_style=None) - - -def add_repos(repos: Dict[str, str], new_paths: List[str]): + name = os.path.basename(os.path.normpath(path)) + if name in repos or name_counts[name] > 1: + # path has no trailing / + par_name = os.path.basename(os.path.dirname(path)) + return os.path.join(par_name, name) + return name + + +def add_repos( + repos: Dict[str, Dict[str, str]], + new_paths: List[str], + include_bare=False, + exclude_submodule=False, + dry_run=False, +) -> Dict[str, Dict[str, str]]: """ - Write new repo paths to file + Write new repo paths to file; return the added repos. + + @param repos: name -> path """ - existing_paths = set(repos.values()) - new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p)) + existing_paths = {prop["path"] for prop in repos.values()} + new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)} new_paths = new_paths - existing_paths + new_repos = {} if new_paths: print(f"Found {len(new_paths)} new repo(s).") + if dry_run: + for p in new_paths: + print(p) + return {} + name_counts = Counter(os.path.basename(os.path.normpath(p)) for p in new_paths) new_repos = { - os.path.basename(os.path.normpath(path)): path - for path in new_paths} - write_to_repo_file(new_repos, 'a+') + _make_name(path, repos, name_counts): { + "path": path, + "flags": "", + } + for path in new_paths + } + write_to_repo_file(new_repos, "a+") + else: + print("No new repos found!") + return new_repos + + +def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[Tuple[str, ...], str]: + """ + Return relative parent strings, and the parent head string + + For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/ + then return (b, c, d) + """ + for p in paths: + rel = get_relative_path(repo_path, p)[:-1] + if rel is not None: + break else: - print('No new repos found!') + return (), "" + head, tail = os.path.split(p) + return (tail, *rel), head + + +def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, Dict]: + """ + + @params repos: repos to be grouped + """ + # FIXME: the upstream code should make sure that paths are all independent + # i.e., each repo should be contained in one and only one path + new_groups = defaultdict(dict) + for repo_name, prop in repos.items(): + hash, head = _generate_dir_hash(prop["path"], paths) + if not hash: + continue + for i in range(1, len(hash) + 1): + group_name = "-".join(hash[:i]) + prop = new_groups[group_name] + prop["path"] = os.path.join(head, *hash[:i]) + if "repos" not in prop: + prop["repos"] = [repo_name] + else: + prop["repos"].append(repo_name) + # FIXME: need to make sure the new group names don't clash with old ones + # or repo names + return new_groups + + +def parse_clone_config(fname: str) -> Iterator[List[str]]: + """ + Return the url, name, and path of all repos in `fname`. + """ + with open(fname) as f: + for line in f: + yield line.strip().split(",") async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]: @@ -140,17 +381,19 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s Run `cmds` asynchronously in `path` directory. Return the `path` if execution fails. """ + # TODO: deprecated since 3.8, will be removed in 3.10 process = await asyncio.create_subprocess_exec( *cmds, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, start_new_session=True, - cwd=path) + cwd=path, + ) stdout, stderr = await process.communicate() for pipe in (stdout, stderr): if pipe: - print(format_output(pipe.decode(), f'{repo_name}: ')) + print(format_output(pipe.decode(), repo_name)) # The existence of stderr is not good indicator since git sometimes write # to stderr even if the execution is successful, e.g. git fetch if process.returncode != 0: @@ -161,7 +404,7 @@ def format_output(s: str, prefix: str): """ Prepends every line in given string with the given prefix. """ - return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)]) + return "".join([f"{prefix}: {line}" for line in s.splitlines(keepends=True)]) def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: @@ -169,7 +412,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: Execute tasks asynchronously """ # TODO: asyncio API is nicer in python 3.7 - if platform.system() == 'Windows': + if platform.system() == "Windows": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: @@ -182,17 +425,21 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: return errors -def describe(repos: Dict[str, str]) -> str: +def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str: """ Return the status of all repos """ if repos: - name_width = max(len(n) for n in repos) + 1 - funcs = info.get_info_funcs() - for name in sorted(repos): - path = repos[name] - display_items = ' '.join(f(path) for f in funcs) - yield f'{name:<{name_width}}{display_items}' + name_width = len(max(repos, key=len)) + 1 + funcs = info.get_info_funcs(no_colors=no_colors) + + num_threads = min(multiprocessing.cpu_count(), len(repos)) + with ThreadPoolExecutor(max_workers=num_threads) as executor: + for line in executor.map( + lambda name: f'{name:<{name_width}}{" ".join(f(repos[name]) for f in funcs)}', + sorted(repos), + ): + yield line def get_cmds_from_files() -> Dict[str, Dict[str, str]]: @@ -208,18 +455,59 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: } """ # default config file - fname = os.path.join(os.path.dirname(__file__), "cmds.yml") - with open(fname, 'r') as stream: - cmds = yaml.load(stream, Loader=yaml.FullLoader) + fname = os.path.join(os.path.dirname(__file__), "cmds.json") + with open(fname, "r") as f: + cmds = json.load(f) # custom config file root = common.get_config_dir() - fname = os.path.join(root, 'cmds.yml') + fname = os.path.join(root, "cmds.json") custom_cmds = {} if os.path.isfile(fname) and os.path.getsize(fname): - with open(fname, 'r') as stream: - custom_cmds = yaml.load(stream, Loader=yaml.FullLoader) + with open(fname, "r") as f: + custom_cmds = json.load(f) # custom commands shadow default ones cmds.update(custom_cmds) return cmds + + +def parse_repos_and_rest( + input: List[str], + quote_mode=False, +) -> Tuple[Dict[str, Dict[str, str]], List[str]]: + """ + Parse gita input arguments + + @return: repos and the rest (e.g., gita shell and super commands) + """ + i = 0 + names = [] + repos = get_repos() + groups = get_groups() + ctx = get_context() + for i, word in enumerate(input): + if word in repos or word in groups: + names.append(word) + else: + break + else: # all input is repos and groups, shift the index once more + if i is not None: + i += 1 + if not names and ctx: + names = [ctx.stem] + if quote_mode and i + 1 != len(input): + print(input[i], "is not a repo or group") + sys.exit(2) + + if names: + chosen = {} + for k in names: + if k in repos: + chosen[k] = repos[k] + if k in groups: + for r in groups[k]["repos"]: + chosen[r] = repos[r] + # if not set here, all repos are chosen + repos = chosen + return repos, input[i:] |