From bddee63922e227c73fd1b4e5b50bbef56bb9a61f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 18 Aug 2020 22:21:46 +0200 Subject: Adding upstream version 0.10.9. Signed-off-by: Daniel Baumann --- gita/utils.py | 225 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 gita/utils.py (limited to 'gita/utils.py') diff --git a/gita/utils.py b/gita/utils.py new file mode 100644 index 0000000..d14484a --- /dev/null +++ b/gita/utils.py @@ -0,0 +1,225 @@ +import os +import yaml +import asyncio +import platform +from functools import lru_cache +from typing import List, Dict, Coroutine, Union + +from . import info +from . import common + + +def get_config_fname(fname: str) -> str: + """ + Return the file name that stores the repo locations. + """ + root = common.get_config_dir() + return os.path.join(root, fname) + + +@lru_cache() +def get_repos() -> Dict[str, str]: + """ + Return a `dict` of repo name to repo absolute path + """ + path_file = get_config_fname('repo_path') + repos = {} + # Each line is a repo path and repo name separated by , + if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: + with open(path_file) as f: + for line in f: + line = line.rstrip() + if not line: # blank line + continue + path, name = line.split(',') + if not is_git(path): + continue + if name not in repos: + repos[name] = path + else: # repo name collision for different paths: include parent path name + par_name = os.path.basename(os.path.dirname(path)) + repos[os.path.join(par_name, name)] = path + return repos + + +@lru_cache() +def get_groups() -> Dict[str, List[str]]: + """ + Return a `dict` of group name to repo names. + """ + fname = get_config_fname('groups.yml') + groups = {} + # Each line is a repo path and repo name separated by , + if os.path.isfile(fname) and os.stat(fname).st_size > 0: + with open(fname, 'r') as f: + groups = yaml.load(f, Loader=yaml.FullLoader) + return groups + + + +def get_choices() -> List[Union[str, None]]: + """ + Return all repo names, group names, and an additional empty list. The empty + list is added as a workaround of + argparse's problem with coexisting nargs='*' and choices. + See https://utcc.utoronto.ca/~cks/space/blog/python/ArgparseNargsChoicesLimitation + and + https://bugs.python.org/issue27227 + """ + choices = list(get_repos()) + choices.extend(get_groups()) + choices.append([]) + return choices + + +def is_git(path: str) -> bool: + """ + Return True if the path is a git repo. + """ + # An alternative is to call `git rev-parse --is-inside-work-tree` + # I don't see why that one is better yet. + # For a regular git repo, .git is a folder, for a worktree repo, .git is a file. + # However, git submodule repo also has .git as a file. + # A more reliable way to differentiable regular and worktree repos is to + # compare the result of `git rev-parse --git-dir` and + # `git rev-parse --git-common-dir` + loc = os.path.join(path, '.git') + # TODO: we can display the worktree repos in a different font. + return os.path.exists(loc) + + +def rename_repo(repos: Dict[str, str], repo: str, new_name: str): + """ + Write new repo name to file + """ + path = repos[repo] + del repos[repo] + repos[new_name] = path + write_to_repo_file(repos, 'w') + + +def write_to_repo_file(repos: Dict[str, str], mode: str): + """ + """ + data = ''.join(f'{path},{name}\n' for name, path in repos.items()) + fname = get_config_fname('repo_path') + os.makedirs(os.path.dirname(fname), exist_ok=True) + with open(fname, mode) as f: + f.write(data) + + +def write_to_groups_file(groups: Dict[str, List[str]], mode: str): + """ + + """ + fname = get_config_fname('groups.yml') + os.makedirs(os.path.dirname(fname), exist_ok=True) + with open(fname, mode) as f: + yaml.dump(groups, f, default_flow_style=None) + + +def add_repos(repos: Dict[str, str], new_paths: List[str]): + """ + Write new repo paths to file + """ + existing_paths = set(repos.values()) + new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p)) + new_paths = new_paths - existing_paths + if new_paths: + print(f"Found {len(new_paths)} new repo(s).") + new_repos = { + os.path.basename(os.path.normpath(path)): path + for path in new_paths} + write_to_repo_file(new_repos, 'a+') + else: + print('No new repos found!') + + +async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]: + """ + Run `cmds` asynchronously in `path` directory. Return the `path` if + execution fails. + """ + process = await asyncio.create_subprocess_exec( + *cmds, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + start_new_session=True, + cwd=path) + stdout, stderr = await process.communicate() + for pipe in (stdout, stderr): + if pipe: + print(format_output(pipe.decode(), f'{repo_name}: ')) + # The existence of stderr is not good indicator since git sometimes write + # to stderr even if the execution is successful, e.g. git fetch + if process.returncode != 0: + return path + + +def format_output(s: str, prefix: str): + """ + Prepends every line in given string with the given prefix. + """ + return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)]) + + +def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: + """ + Execute tasks asynchronously + """ + # TODO: asyncio API is nicer in python 3.7 + if platform.system() == 'Windows': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + + try: + errors = loop.run_until_complete(asyncio.gather(*tasks)) + finally: + loop.close() + return errors + + +def describe(repos: Dict[str, str]) -> str: + """ + Return the status of all repos + """ + if repos: + name_width = max(len(n) for n in repos) + 1 + funcs = info.get_info_funcs() + for name in sorted(repos): + path = repos[name] + display_items = ' '.join(f(path) for f in funcs) + yield f'{name:<{name_width}}{display_items}' + + +def get_cmds_from_files() -> Dict[str, Dict[str, str]]: + """ + Parse delegated git commands from default config file + and custom config file. + + Example return + { + 'branch': {'help': 'show local branches'}, + 'clean': {'cmd': 'clean -dfx', + 'help': 'remove all untracked files/folders'}, + } + """ + # default config file + fname = os.path.join(os.path.dirname(__file__), "cmds.yml") + with open(fname, 'r') as stream: + cmds = yaml.load(stream, Loader=yaml.FullLoader) + + # custom config file + root = common.get_config_dir() + fname = os.path.join(root, 'cmds.yml') + custom_cmds = {} + if os.path.isfile(fname) and os.path.getsize(fname): + with open(fname, 'r') as stream: + custom_cmds = yaml.load(stream, Loader=yaml.FullLoader) + + # custom commands shadow default ones + cmds.update(custom_cmds) + return cmds -- cgit v1.2.3