From 8fd7f9bfed753dbaa5543747569b4c2543aff03d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 17 Jul 2021 09:26:34 +0200 Subject: Merging upstream version 0.15.1. Signed-off-by: Daniel Baumann --- gita/utils.py | 261 ++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 198 insertions(+), 63 deletions(-) (limited to 'gita/utils.py') diff --git a/gita/utils.py b/gita/utils.py index 9572f02..34fc435 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -1,62 +1,76 @@ import os -import yaml +import json +import csv import asyncio import platform +import subprocess from functools import lru_cache, partial from pathlib import Path -from typing import List, Dict, Coroutine, Union, Iterator +from typing import List, Dict, Coroutine, Union, Iterator, Tuple +from collections import Counter, defaultdict from . import info from . import common -@lru_cache() -def get_context() -> Union[Path, None]: +# TODO: python3.9 pathlib has is_relative_to() function +def is_relative_to(kid: str, parent: str) -> bool: """ - Return the context: either a group name or 'none' + Both the `kid` and `parent` should be absolute path """ - config_dir = Path(common.get_config_dir()) - matches = list(config_dir.glob('*.context')) - assert len(matches) < 2, "Cannot have multiple .context file" - return matches[0] if matches else None + return parent == os.path.commonpath((kid, parent)) @lru_cache() -def get_repos() -> Dict[str, str]: +def get_repos(root=None) -> Dict[str, Dict[str, str]]: """ - Return a `dict` of repo name to repo absolute path + Return a `dict` of repo name to repo absolute path and repo type + + @param root: Use local config if set. If None, use either global or local + config depending on cwd. """ - path_file = common.get_config_fname('repo_path') + path_file = common.get_config_fname('repos.csv', root) repos = {} - # Each line is a repo path and repo name separated by , if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: with open(path_file) as f: - for line in f: - line = line.rstrip() - if not line: # blank line - continue - path, name = line.split(',') - if not is_git(path): - continue - if name not in repos: - repos[name] = path - else: # repo name collision for different paths: include parent path name - par_name = os.path.basename(os.path.dirname(path)) - repos[os.path.join(par_name, name)] = path + rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'], + restval='') # it's actually a reader + repos = {r['name']: + {'path': r['path'], 'type': r['type'], + 'flags': r['flags'].split()} + for r in rows if is_git(r['path'], is_bare=True)} + if root is None: # detect if inside a main path + cwd = os.getcwd() + for prop in repos.values(): + path = prop['path'] + if prop['type'] == 'm' and is_relative_to(cwd, path): + return get_repos(path) return repos +@lru_cache() +def get_context() -> Union[Path, None]: + """ + Return the context: either a group name or 'none' + """ + config_dir = Path(common.get_config_dir()) + matches = list(config_dir.glob('*.context')) + assert len(matches) < 2, "Cannot have multiple .context file" + return matches[0] if matches else None + + @lru_cache() def get_groups() -> Dict[str, List[str]]: """ Return a `dict` of group name to repo names. """ - fname = common.get_config_fname('groups.yml') + fname = common.get_config_fname('groups.csv') groups = {} # Each line is a repo path and repo name separated by , if os.path.isfile(fname) and os.stat(fname).st_size > 0: with open(fname, 'r') as f: - groups = yaml.load(f, Loader=yaml.FullLoader) + rows = csv.reader(f, delimiter=':') + groups = {r[0]: r[1].split() for r in rows} return groups @@ -75,10 +89,12 @@ def get_choices() -> List[Union[str, None]]: return choices -def is_git(path: str) -> bool: +def is_git(path: str, is_bare=False) -> bool: """ Return True if the path is a git repo. """ + if not os.path.exists(path): + return False # An alternative is to call `git rev-parse --is-inside-work-tree` # I don't see why that one is better yet. # For a regular git repo, .git is a folder, for a worktree repo, .git is a file. @@ -88,59 +104,172 @@ def is_git(path: str) -> bool: # `git rev-parse --git-common-dir` loc = os.path.join(path, '.git') # TODO: we can display the worktree repos in a different font. - return os.path.exists(loc) - - -def rename_repo(repos: Dict[str, str], repo: str, new_name: str): + if os.path.exists(loc): + return True + if not is_bare: + return False + # detect bare repo + got = subprocess.run('git rev-parse --is-bare-repository'.split(), + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, + cwd=path + ) + if got.returncode == 0 and got.stdout == b'true\n': + return True + return False + +def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str): """ Write new repo name to file """ - path = repos[repo] + if new_name in repos: + print(f"{new_name} is already in use!") + return + prop = repos[repo] del repos[repo] - repos[new_name] = path - write_to_repo_file(repos, 'w') - - -def write_to_repo_file(repos: Dict[str, str], mode: str): + repos[new_name] = prop + # write to local config if inside a main path + main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm') + cwd = os.getcwd() + is_local_config = True + for p in main_paths: + if is_relative_to(cwd, p): + write_to_repo_file(repos, 'w', p) + break + else: # global config + write_to_repo_file(repos, 'w') + is_local_config = False + # update groups only when outside any main repos + if is_local_config: + return + groups = get_groups() + for g, members in groups.items(): + if repo in members: + members.remove(repo) + members.append(new_name) + groups[g] = sorted(members) + write_to_groups_file(groups, 'w') + + +def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None): """ + @param repos: each repo is {name: {properties}} """ - data = ''.join(f'{path},{name}\n' for name, path in repos.items()) - fname = common.get_config_fname('repo_path') + data = [(prop['path'], name, prop['type'], ' '.join(prop['flags'])) + for name, prop in repos.items()] + fname = common.get_config_fname('repos.csv', root) os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - f.write(data) + with open(fname, mode, newline='') as f: + writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerows(data) def write_to_groups_file(groups: Dict[str, List[str]], mode: str): """ """ - fname = common.get_config_fname('groups.yml') + fname = common.get_config_fname('groups.csv') os.makedirs(os.path.dirname(fname), exist_ok=True) if not groups: # all groups are deleted open(fname, 'w').close() else: - with open(fname, mode) as f: - yaml.dump(groups, f, default_flow_style=None) + with open(fname, mode, newline='') as f: + data = [ + (group, ' '.join(repos)) + for group, repos in groups.items() + ] + writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerows(data) + + +def _make_name(path: str, repos: Dict[str, Dict[str, str]], + name_counts: Counter) -> str: + """ + Given a new repo `path`, create a repo name. By default, basename is used. + If name collision exists, further include parent path name. + + @param path: It should not be in `repos` and is absolute + """ + name = os.path.basename(os.path.normpath(path)) + if name in repos or name_counts[name] > 1: + par_name = os.path.basename(os.path.dirname(path)) + return os.path.join(par_name, name) + return name -def add_repos(repos: Dict[str, str], new_paths: List[str]): +def _get_repo_type(path, repo_type, root) -> str: """ - Write new repo paths to file + + """ + if repo_type != '': # explicitly set + return repo_type + if root is not None and os.path.normpath(root) == os.path.normpath(path): + return 'm' + return '' + + +def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str], + repo_type='', root=None, is_bare=False) -> Dict[str, Dict[str, str]]: + """ + Write new repo paths to file; return the added repos. @param repos: name -> path """ - existing_paths = set(repos.values()) - new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p)) + existing_paths = {prop['path'] for prop in repos.values()} + new_paths = {p for p in new_paths if is_git(p, is_bare)} new_paths = new_paths - existing_paths + new_repos = {} if new_paths: print(f"Found {len(new_paths)} new repo(s).") - new_repos = { - os.path.basename(os.path.normpath(path)): path - for path in new_paths} - write_to_repo_file(new_repos, 'a+') + name_counts = Counter( + os.path.basename(os.path.normpath(p)) for p in new_paths + ) + new_repos = {_make_name(path, repos, name_counts): { + 'path': path, + 'type': _get_repo_type(path, repo_type, root), + 'flags': '', + } for path in new_paths} + # When root is not None, we could optionally set its type to 'm', i.e., + # main repo. + write_to_repo_file(new_repos, 'a+', root) else: print('No new repos found!') + return new_repos + + +def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]: + """ + Return relative parent strings + + For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/ + then return (b, c, d) + """ + for p in paths: + if is_relative_to(repo_path, p): + break + else: + return () + return (os.path.basename(p), + *os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1]) + + +def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str] + ) -> Dict[str, List[str]]: + """ + + """ + # FIXME: the upstream code should make sure that paths are all independent + # i.e., each repo should be contained in one and only one path + new_groups = defaultdict(list) + for repo_name, prop in repos.items(): + hash = _generate_dir_hash(prop['path'], paths) + if not hash: + continue + for i in range(1, len(hash)+1): + group_name = '-'.join(hash[:i]) + new_groups[group_name].append(repo_name) + # FIXME: need to make sure the new group names don't clash with old ones + # or repo names + return new_groups def parse_clone_config(fname: str) -> Iterator[List[str]]: @@ -157,6 +286,7 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s Run `cmds` asynchronously in `path` directory. Return the `path` if execution fails. """ + # TODO: deprecated since 3.8, will be removed in 3.10 process = await asyncio.create_subprocess_exec( *cmds, stdin=asyncio.subprocess.DEVNULL, @@ -199,7 +329,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: return errors -def describe(repos: Dict[str, str], no_colors: bool=False) -> str: +def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str: """ Return the status of all repos """ @@ -213,9 +343,14 @@ def describe(repos: Dict[str, str], no_colors: bool=False) -> str: funcs[idx] = partial(get_repo_status, no_colors=True) for name in sorted(repos): - path = repos[name] - info_items = ' '.join(f(path) for f in funcs) - yield f'{name:<{name_width}}{info_items}' + info_items = ' '.join(f(repos[name]) for f in funcs) + if repos[name]['type'] == 'm': + # ANSI color code also takes length in Python + name = f'{info.Color.underline}{name}{info.Color.end}' + width = name_width + 8 + yield f'{name:<{width}}{info_items}' + else: + yield f'{name:<{name_width}}{info_items}' def get_cmds_from_files() -> Dict[str, Dict[str, str]]: @@ -231,17 +366,17 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: } """ # default config file - fname = os.path.join(os.path.dirname(__file__), "cmds.yml") - with open(fname, 'r') as stream: - cmds = yaml.load(stream, Loader=yaml.FullLoader) + fname = os.path.join(os.path.dirname(__file__), "cmds.json") + with open(fname, 'r') as f: + cmds = json.load(f) # custom config file root = common.get_config_dir() - fname = os.path.join(root, 'cmds.yml') + fname = os.path.join(root, 'cmds.json') custom_cmds = {} if os.path.isfile(fname) and os.path.getsize(fname): - with open(fname, 'r') as stream: - custom_cmds = yaml.load(stream, Loader=yaml.FullLoader) + with open(fname, 'r') as f: + custom_cmds = json.load(f) # custom commands shadow default ones cmds.update(custom_cmds) -- cgit v1.2.3