From bddee63922e227c73fd1b4e5b50bbef56bb9a61f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 18 Aug 2020 22:21:46 +0200 Subject: Adding upstream version 0.10.9. Signed-off-by: Daniel Baumann --- gita/__init__.py | 3 + gita/__main__.py | 289 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ gita/cmds.yml | 65 +++++++++++++ gita/common.py | 7 ++ gita/info.py | 146 ++++++++++++++++++++++++++++ gita/utils.py | 225 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 735 insertions(+) create mode 100644 gita/__init__.py create mode 100644 gita/__main__.py create mode 100644 gita/cmds.yml create mode 100644 gita/common.py create mode 100644 gita/info.py create mode 100644 gita/utils.py (limited to 'gita') diff --git a/gita/__init__.py b/gita/__init__.py new file mode 100644 index 0000000..eeb79a3 --- /dev/null +++ b/gita/__init__.py @@ -0,0 +1,3 @@ +import pkg_resources + +__version__ = pkg_resources.get_distribution('gita').version diff --git a/gita/__main__.py b/gita/__main__.py new file mode 100644 index 0000000..ba0d270 --- /dev/null +++ b/gita/__main__.py @@ -0,0 +1,289 @@ +''' +Gita manages multiple git repos. It has two functionalities + + 1. display the status of multiple repos side by side + 2. delegate git commands/aliases from any working directory + +Examples: + gita ls + gita fetch + gita stat myrepo2 + gita super myrepo1 commit -am 'add some cool feature' + +For bash auto completion, download and source +https://github.com/nosarthur/gita/blob/master/.gita-completion.bash +''' + +import os +import argparse +import subprocess +import pkg_resources + +from . import utils, info + + +def f_add(args: argparse.Namespace): + repos = utils.get_repos() + utils.add_repos(repos, args.paths) + + +def f_rename(args: argparse.Namespace): + repos = utils.get_repos() + utils.rename_repo(repos, args.repo[0], args.new_name) + + +def f_info(_): + all_items, to_display = info.get_info_items() + print('In use:', ','.join(to_display)) + unused = set(all_items) - set(to_display) + if unused: + print('Unused:', ' '.join(unused)) + + +def f_ll(args: argparse.Namespace): + """ + Display details of all repos + """ + repos = utils.get_repos() + if args.group: # only display repos in this group + group_repos = utils.get_groups()[args.group] + repos = {k: repos[k] for k in group_repos if k in repos} + for line in utils.describe(repos): + print(line) + + +def f_ls(args: argparse.Namespace): + repos = utils.get_repos() + if args.repo: # one repo, show its path + print(repos[args.repo]) + else: # show names of all repos + print(' '.join(repos)) + + +def f_group(args: argparse.Namespace): + repos = utils.get_repos() + groups = utils.get_groups() + if args.to_group: + gname = input('group name? ') + if gname in groups: + gname_repos = set(groups[gname]) + gname_repos.update(args.to_group) + groups[gname] = sorted(gname_repos) + utils.write_to_groups_file(groups, 'w') + else: + utils.write_to_groups_file({gname: sorted(args.to_group)}, 'a+') + else: + for group, repos in groups.items(): + print(f"{group}: {', '.join(repos)}") + + +def f_ungroup(args: argparse.Namespace): + groups = utils.get_groups() + to_ungroup = set(args.to_ungroup) + to_del = [] + for name, repos in groups.items(): + remaining = set(repos) - to_ungroup + if remaining: + groups[name] = list(sorted(remaining)) + else: + to_del.append(name) + for name in to_del: + del groups[name] + utils.write_to_groups_file(groups, 'w') + + +def f_rm(args: argparse.Namespace): + """ + Unregister repo(s) from gita + """ + path_file = utils.get_config_fname('repo_path') + if os.path.isfile(path_file): + repos = utils.get_repos() + for repo in args.repo: + del repos[repo] + utils.write_to_repo_file(repos, 'w') + + +def f_git_cmd(args: argparse.Namespace): + """ + Delegate git command/alias defined in `args.cmd`. Asynchronous execution is + disabled for commands in the `args.async_blacklist`. + """ + repos = utils.get_repos() + groups = utils.get_groups() + if args.repo: # with user specified repo(s) or group(s) + chosen = {} + for k in args.repo: + if k in repos: + chosen[k] = repos[k] + if k in groups: + for r in groups[k]: + chosen[r] = repos[r] + repos = chosen + cmds = ['git'] + args.cmd + if len(repos) == 1 or cmds[1] in args.async_blacklist: + for path in repos.values(): + print(path) + subprocess.run(cmds, cwd=path) + else: # run concurrent subprocesses + # Async execution cannot deal with multiple repos' user name/password. + # Here we shut off any user input in the async execution, and re-run + # the failed ones synchronously. + errors = utils.exec_async_tasks( + utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items()) + for path in errors: + if path: + print(path) + subprocess.run(cmds, cwd=path) + + +def f_super(args): + """ + Delegate git command/alias defined in `args.man`, which may or may not + contain repo names. + """ + names = [] + repos = utils.get_repos() + groups = utils.get_groups() + for i, word in enumerate(args.man): + if word in repos or word in groups: + names.append(word) + else: + break + args.cmd = args.man[i:] + args.repo = names + f_git_cmd(args) + + +def main(argv=None): + p = argparse.ArgumentParser(prog='gita', + formatter_class=argparse.RawTextHelpFormatter, + description=__doc__) + subparsers = p.add_subparsers(title='sub-commands', + help='additional help with sub-command -h') + + version = pkg_resources.require('gita')[0].version + p.add_argument('-v', + '--version', + action='version', + version=f'%(prog)s {version}') + + # bookkeeping sub-commands + p_add = subparsers.add_parser('add', help='add repo(s)') + p_add.add_argument('paths', nargs='+', help="add repo(s)") + p_add.set_defaults(func=f_add) + + p_rm = subparsers.add_parser('rm', help='remove repo(s)') + p_rm.add_argument('repo', + nargs='+', + choices=utils.get_repos(), + help="remove the chosen repo(s)") + p_rm.set_defaults(func=f_rm) + + p_rename = subparsers.add_parser('rename', help='rename a repo') + p_rename.add_argument( + 'repo', + nargs=1, + choices=utils.get_repos(), + help="rename the chosen repo") + p_rename.add_argument( + 'new_name', + help="new name") + p_rename.set_defaults(func=f_rename) + + p_info = subparsers.add_parser('info', help='show information items of the ll sub-command') + p_info.set_defaults(func=f_info) + + ll_doc = f''' status symbols: + +: staged changes + *: unstaged changes + _: untracked files/folders + + branch colors: + {info.Color.white}white{info.Color.end}: local has no remote + {info.Color.green}green{info.Color.end}: local is the same as remote + {info.Color.red}red{info.Color.end}: local has diverged from remote + {info.Color.purple}purple{info.Color.end}: local is ahead of remote (good for push) + {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)''' + p_ll = subparsers.add_parser('ll', + help='display summary of all repos', + formatter_class=argparse.RawTextHelpFormatter, + description=ll_doc) + p_ll.add_argument('group', + nargs='?', + choices=utils.get_groups(), + help="show repos in the chosen group") + p_ll.set_defaults(func=f_ll) + + p_ls = subparsers.add_parser( + 'ls', help='display names of all repos, or path of a chosen repo') + p_ls.add_argument('repo', + nargs='?', + choices=utils.get_repos(), + help="show path of the chosen repo") + p_ls.set_defaults(func=f_ls) + + p_group = subparsers.add_parser( + 'group', help='group repos or display names of all groups if no repo is provided') + p_group.add_argument('to_group', + nargs='*', + choices=utils.get_choices(), + help="repo(s) to be grouped") + p_group.set_defaults(func=f_group) + + p_ungroup = subparsers.add_parser( + 'ungroup', help='remove group information for repos', + description="Remove group information on repos") + p_ungroup.add_argument('to_ungroup', + nargs='+', + choices=utils.get_repos(), + help="repo(s) to be ungrouped") + p_ungroup.set_defaults(func=f_ungroup) + + # superman mode + p_super = subparsers.add_parser( + 'super', + help='superman mode: delegate any git command/alias in specified or ' + 'all repo(s).\n' + 'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n' + '\t gita super repo1 repo2 repo3 checkout new-feature') + p_super.add_argument( + 'man', + nargs=argparse.REMAINDER, + help="execute arbitrary git command/alias for specified or all repos " + "Example: gita super myrepo1 diff --name-only --staged " + "Another: gita super checkout master ") + p_super.set_defaults(func=f_super) + + # sub-commands that fit boilerplate + cmds = utils.get_cmds_from_files() + for name, data in cmds.items(): + help = data.get('help') + cmd = data.get('cmd') or name + if data.get('allow_all'): + choices = utils.get_choices() + nargs = '*' + help += ' for all repos or' + else: + choices = utils.get_repos().keys() | utils.get_groups().keys() + nargs = '+' + help += ' for the chosen repo(s) or group(s)' + sp = subparsers.add_parser(name, help=help) + sp.add_argument('repo', nargs=nargs, choices=choices, help=help) + sp.set_defaults(func=f_git_cmd, cmd=cmd.split()) + + args = p.parse_args(argv) + + args.async_blacklist = { + name + for name, data in cmds.items() if data.get('disable_async') + } + + if 'func' in args: + args.func(args) + else: + p.print_help() # pragma: no cover + + +if __name__ == '__main__': + main() # pragma: no cover diff --git a/gita/cmds.yml b/gita/cmds.yml new file mode 100644 index 0000000..8db932e --- /dev/null +++ b/gita/cmds.yml @@ -0,0 +1,65 @@ +br: + cmd: branch -vv + help: show local branches +clean: + cmd: clean -dfx + help: remove all untracked files/folders +diff: + help: show differences +difftool: + disable_async: true + help: show differences using a tool +fetch: + allow_all: true + help: fetch remote update +last: + cmd: log -1 HEAD + help: show log information of HEAD +log: + disable_async: true + help: show logs +merge: + cmd: merge @{u} + help: merge remote updates +mergetool: + disable_async: true + help: merge updates with a tool +patch: + cmd: format-patch HEAD~ + help: make a patch +pull: + allow_all: true + help: pull remote updates +push: + help: push the local updates +rebase: + help: rebase from master +reflog: + help: show ref logs +remote: + cmd: remote -v + help: show remote settings +reset: + help: reset repo(s) +shortlog: + disable_async: true + help: show short log +show: + disable_async: true + help: show detailed commit information +show-branch: + disable_async: true + help: show detailed branch information +stash: + help: store uncommited changes +stat: + cmd: diff --stat + help: show edit statistics +st: + help: show status +tag: + cmd: tag -n + help: show tags +whatchanged: + disable_async: true + help: show detailed log diff --git a/gita/common.py b/gita/common.py new file mode 100644 index 0000000..61df049 --- /dev/null +++ b/gita/common.py @@ -0,0 +1,7 @@ +import os + +def get_config_dir() -> str: + parent = os.environ.get('XDG_CONFIG_HOME') or os.path.join( + os.path.expanduser('~'), '.config') + root = os.path.join(parent,"gita") + return root diff --git a/gita/info.py b/gita/info.py new file mode 100644 index 0000000..18d20fd --- /dev/null +++ b/gita/info.py @@ -0,0 +1,146 @@ +import os +import sys +import yaml +import subprocess +from typing import Tuple, List, Callable, Dict +from . import common + + +class Color: + """ + Terminal color + """ + red = '\x1b[31m' # local diverges from remote + green = '\x1b[32m' # local == remote + yellow = '\x1b[33m' # local is behind + blue = '\x1b[34m' + purple = '\x1b[35m' # local is ahead + cyan = '\x1b[36m' + white = '\x1b[37m' # no remote branch + end = '\x1b[0m' + + +def get_info_funcs() -> List[Callable[[str], str]]: + """ + Return the functions to generate `gita ll` information. All these functions + take the repo path as input and return the corresponding information as str. + See `get_path`, `get_repo_status`, `get_common_commit` for examples. + """ + info_items, to_display = get_info_items() + return [info_items[k] for k in to_display] + + +def get_info_items() -> Tuple[Dict[str, Callable[[str], str]], List[str]]: + """ + Return the available information items for display in the `gita ll` + sub-command, and the ones to be displayed. + It loads custom information functions and configuration if they exist. + """ + # default settings + info_items = {'branch': get_repo_status, + 'commit_msg': get_commit_msg, + 'path': get_path, } + display_items = ['branch', 'commit_msg'] + + # custom settings + root = common.get_config_dir() + src_fname = os.path.join(root, 'extra_repo_info.py') + yml_fname = os.path.join(root, 'info.yml') + if os.path.isfile(src_fname): + sys.path.append(root) + from extra_repo_info import extra_info_items + info_items.update(extra_info_items) + if os.path.isfile(yml_fname): + with open(yml_fname, 'r') as stream: + display_items = yaml.load(stream, Loader=yaml.FullLoader) + display_items = [x for x in display_items if x in info_items] + return info_items, display_items + + +def get_path(path): + return Color.cyan + path + Color.end + + +def get_head(path: str) -> str: + result = subprocess.run('git rev-parse --abbrev-ref HEAD'.split(), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True, + cwd=path) + return result.stdout.strip() + + +def run_quiet_diff(args: List[str]) -> bool: + """ + Return the return code of git diff `args` in quiet mode + """ + result = subprocess.run( + ['git', 'diff', '--quiet'] + args, + stderr=subprocess.DEVNULL, + ) + return result.returncode + + +def get_common_commit() -> str: + """ + Return the hash of the common commit of the local and upstream branches. + """ + result = subprocess.run('git merge-base @{0} @{u}'.split(), + stdout=subprocess.PIPE, + universal_newlines=True) + return result.stdout.strip() + + +def has_untracked() -> bool: + """ + Return True if untracked file/folder exists + """ + result = subprocess.run('git ls-files -zo --exclude-standard'.split(), + stdout=subprocess.PIPE) + return bool(result.stdout) + + +def get_commit_msg(path: str) -> str: + """ + Return the last commit message. + """ + # `git show-branch --no-name HEAD` is faster than `git show -s --format=%s` + result = subprocess.run('git show-branch --no-name HEAD'.split(), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True, + cwd=path) + return result.stdout.strip() + + +def get_repo_status(path: str) -> str: + head = get_head(path) + dirty, staged, untracked, color = _get_repo_status(path) + return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}' + + +def _get_repo_status(path: str) -> Tuple[str]: + """ + Return the status of one repo + """ + os.chdir(path) + dirty = '*' if run_quiet_diff([]) else '' + staged = '+' if run_quiet_diff(['--cached']) else '' + untracked = '_' if has_untracked() else '' + + diff_returncode = run_quiet_diff(['@{u}', '@{0}']) + has_no_remote = diff_returncode == 128 + has_no_diff = diff_returncode == 0 + if has_no_remote: + color = Color.white + elif has_no_diff: + color = Color.green + else: + common_commit = get_common_commit() + outdated = run_quiet_diff(['@{u}', common_commit]) + if outdated: + diverged = run_quiet_diff(['@{0}', common_commit]) + color = Color.red if diverged else Color.yellow + else: # local is ahead of remote + color = Color.purple + return dirty, staged, untracked, color diff --git a/gita/utils.py b/gita/utils.py new file mode 100644 index 0000000..d14484a --- /dev/null +++ b/gita/utils.py @@ -0,0 +1,225 @@ +import os +import yaml +import asyncio +import platform +from functools import lru_cache +from typing import List, Dict, Coroutine, Union + +from . import info +from . import common + + +def get_config_fname(fname: str) -> str: + """ + Return the file name that stores the repo locations. + """ + root = common.get_config_dir() + return os.path.join(root, fname) + + +@lru_cache() +def get_repos() -> Dict[str, str]: + """ + Return a `dict` of repo name to repo absolute path + """ + path_file = get_config_fname('repo_path') + repos = {} + # Each line is a repo path and repo name separated by , + if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: + with open(path_file) as f: + for line in f: + line = line.rstrip() + if not line: # blank line + continue + path, name = line.split(',') + if not is_git(path): + continue + if name not in repos: + repos[name] = path + else: # repo name collision for different paths: include parent path name + par_name = os.path.basename(os.path.dirname(path)) + repos[os.path.join(par_name, name)] = path + return repos + + +@lru_cache() +def get_groups() -> Dict[str, List[str]]: + """ + Return a `dict` of group name to repo names. + """ + fname = get_config_fname('groups.yml') + groups = {} + # Each line is a repo path and repo name separated by , + if os.path.isfile(fname) and os.stat(fname).st_size > 0: + with open(fname, 'r') as f: + groups = yaml.load(f, Loader=yaml.FullLoader) + return groups + + + +def get_choices() -> List[Union[str, None]]: + """ + Return all repo names, group names, and an additional empty list. The empty + list is added as a workaround of + argparse's problem with coexisting nargs='*' and choices. + See https://utcc.utoronto.ca/~cks/space/blog/python/ArgparseNargsChoicesLimitation + and + https://bugs.python.org/issue27227 + """ + choices = list(get_repos()) + choices.extend(get_groups()) + choices.append([]) + return choices + + +def is_git(path: str) -> bool: + """ + Return True if the path is a git repo. + """ + # An alternative is to call `git rev-parse --is-inside-work-tree` + # I don't see why that one is better yet. + # For a regular git repo, .git is a folder, for a worktree repo, .git is a file. + # However, git submodule repo also has .git as a file. + # A more reliable way to differentiable regular and worktree repos is to + # compare the result of `git rev-parse --git-dir` and + # `git rev-parse --git-common-dir` + loc = os.path.join(path, '.git') + # TODO: we can display the worktree repos in a different font. + return os.path.exists(loc) + + +def rename_repo(repos: Dict[str, str], repo: str, new_name: str): + """ + Write new repo name to file + """ + path = repos[repo] + del repos[repo] + repos[new_name] = path + write_to_repo_file(repos, 'w') + + +def write_to_repo_file(repos: Dict[str, str], mode: str): + """ + """ + data = ''.join(f'{path},{name}\n' for name, path in repos.items()) + fname = get_config_fname('repo_path') + os.makedirs(os.path.dirname(fname), exist_ok=True) + with open(fname, mode) as f: + f.write(data) + + +def write_to_groups_file(groups: Dict[str, List[str]], mode: str): + """ + + """ + fname = get_config_fname('groups.yml') + os.makedirs(os.path.dirname(fname), exist_ok=True) + with open(fname, mode) as f: + yaml.dump(groups, f, default_flow_style=None) + + +def add_repos(repos: Dict[str, str], new_paths: List[str]): + """ + Write new repo paths to file + """ + existing_paths = set(repos.values()) + new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p)) + new_paths = new_paths - existing_paths + if new_paths: + print(f"Found {len(new_paths)} new repo(s).") + new_repos = { + os.path.basename(os.path.normpath(path)): path + for path in new_paths} + write_to_repo_file(new_repos, 'a+') + else: + print('No new repos found!') + + +async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]: + """ + Run `cmds` asynchronously in `path` directory. Return the `path` if + execution fails. + """ + process = await asyncio.create_subprocess_exec( + *cmds, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + start_new_session=True, + cwd=path) + stdout, stderr = await process.communicate() + for pipe in (stdout, stderr): + if pipe: + print(format_output(pipe.decode(), f'{repo_name}: ')) + # The existence of stderr is not good indicator since git sometimes write + # to stderr even if the execution is successful, e.g. git fetch + if process.returncode != 0: + return path + + +def format_output(s: str, prefix: str): + """ + Prepends every line in given string with the given prefix. + """ + return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)]) + + +def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: + """ + Execute tasks asynchronously + """ + # TODO: asyncio API is nicer in python 3.7 + if platform.system() == 'Windows': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + + try: + errors = loop.run_until_complete(asyncio.gather(*tasks)) + finally: + loop.close() + return errors + + +def describe(repos: Dict[str, str]) -> str: + """ + Return the status of all repos + """ + if repos: + name_width = max(len(n) for n in repos) + 1 + funcs = info.get_info_funcs() + for name in sorted(repos): + path = repos[name] + display_items = ' '.join(f(path) for f in funcs) + yield f'{name:<{name_width}}{display_items}' + + +def get_cmds_from_files() -> Dict[str, Dict[str, str]]: + """ + Parse delegated git commands from default config file + and custom config file. + + Example return + { + 'branch': {'help': 'show local branches'}, + 'clean': {'cmd': 'clean -dfx', + 'help': 'remove all untracked files/folders'}, + } + """ + # default config file + fname = os.path.join(os.path.dirname(__file__), "cmds.yml") + with open(fname, 'r') as stream: + cmds = yaml.load(stream, Loader=yaml.FullLoader) + + # custom config file + root = common.get_config_dir() + fname = os.path.join(root, 'cmds.yml') + custom_cmds = {} + if os.path.isfile(fname) and os.path.getsize(fname): + with open(fname, 'r') as stream: + custom_cmds = yaml.load(stream, Loader=yaml.FullLoader) + + # custom commands shadow default ones + cmds.update(custom_cmds) + return cmds -- cgit v1.2.3