From 715f81ffde428759c902cdd16c9b8d03f2bdf463 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 2 Nov 2020 16:57:16 +0100 Subject: Adding upstream version 0.11.9. Signed-off-by: Daniel Baumann --- gita/__main__.py | 193 +++++++++++++++++++++++++++++++++++++++++++------------ gita/common.py | 8 +++ gita/info.py | 108 ++++++++++++++++++++++--------- gita/utils.py | 41 ++++++++---- 4 files changed, 265 insertions(+), 85 deletions(-) (limited to 'gita') diff --git a/gita/__main__.py b/gita/__main__.py index 6e47f8e..9a24bb9 100644 --- a/gita/__main__.py +++ b/gita/__main__.py @@ -15,16 +15,23 @@ https://github.com/nosarthur/gita/blob/master/.gita-completion.bash ''' import os +import sys +import yaml import argparse import subprocess import pkg_resources +from itertools import chain +from pathlib import Path -from . import utils, info +from . import utils, info, common def f_add(args: argparse.Namespace): repos = utils.get_repos() - utils.add_repos(repos, args.paths) + paths = args.paths + if args.recursive: + paths = chain.from_iterable(Path(p).glob('**') for p in args.paths) + utils.add_repos(repos, paths) def f_rename(args: argparse.Namespace): @@ -32,12 +39,33 @@ def f_rename(args: argparse.Namespace): utils.rename_repo(repos, args.repo[0], args.new_name) -def f_info(_): - all_items, to_display = info.get_info_items() - print('In use:', ','.join(to_display)) - unused = set(all_items) - set(to_display) - if unused: - print('Unused:', ' '.join(unused)) +def f_color(args: argparse.Namespace): + cmd = args.color_cmd or 'll' + if cmd == 'll': # pragma: no cover + info.show_colors() + elif cmd == 'set': + print('not implemented') + + +def f_info(args: argparse.Namespace): + to_display = info.get_info_items() + cmd = args.info_cmd or 'll' + if cmd == 'll': + print('In use:', ','.join(to_display)) + unused = set(info.ALL_INFO_ITEMS) - set(to_display) + if unused: + print('Unused:', ' '.join(unused)) + return + if cmd == 'add' and args.info_item not in to_display: + to_display.append(args.info_item) + yml_config = common.get_config_fname('info.yml') + with open(yml_config, 'w') as f: + yaml.dump(to_display, f, default_flow_style=None) + elif cmd == 'rm' and args.info_item in to_display: + to_display.remove(args.info_item) + yml_config = common.get_config_fname('info.yml') + with open(yml_config, 'w') as f: + yaml.dump(to_display, f, default_flow_style=None) def f_ll(args: argparse.Namespace): @@ -45,10 +73,13 @@ def f_ll(args: argparse.Namespace): Display details of all repos """ repos = utils.get_repos() + ctx = utils.get_context() + if args.group is None and ctx: + args.group = ctx.stem if args.group: # only display repos in this group group_repos = utils.get_groups()[args.group] repos = {k: repos[k] for k in group_repos if k in repos} - for line in utils.describe(repos): + for line in utils.describe(repos, no_colors=args.no_colors): print(line) @@ -62,8 +93,26 @@ def f_ls(args: argparse.Namespace): def f_group(args: argparse.Namespace): groups = utils.get_groups() - if args.to_group: - gname = input('group name? ') + cmd = args.group_cmd or 'll' + if cmd == 'll': + for group, repos in groups.items(): + print(f"{group}: {' '.join(repos)}") + elif cmd == 'ls': + print(' '.join(groups)) + elif cmd == 'rename': + new_name = args.new_name + if new_name in groups: + sys.exit(f'{new_name} already exists.') + gname = args.gname + groups[new_name] = groups[gname] + del groups[gname] + utils.write_to_groups_file(groups, 'w') + elif cmd == 'rm': + for name in args.to_ungroup: + del groups[name] + utils.write_to_groups_file(groups, 'w') + elif cmd == 'add': + gname = args.gname if gname in groups: gname_repos = set(groups[gname]) gname_repos.update(args.to_group) @@ -71,31 +120,32 @@ def f_group(args: argparse.Namespace): utils.write_to_groups_file(groups, 'w') else: utils.write_to_groups_file({gname: sorted(args.to_group)}, 'a+') - else: - for group, repos in groups.items(): - print(f"{group}: {' '.join(repos)}") -def f_ungroup(args: argparse.Namespace): - groups = utils.get_groups() - to_ungroup = set(args.to_ungroup) - to_del = [] - for name, repos in groups.items(): - remaining = set(repos) - to_ungroup - if remaining: - groups[name] = list(sorted(remaining)) +def f_context(args: argparse.Namespace): + choice = args.choice + ctx = utils.get_context() + if choice is None: + if ctx: + group = ctx.stem + print(f"{group}: {' '.join(utils.get_groups()[group])}") + else: + print('Context is not set') + elif choice == 'none': # remove context + ctx and ctx.unlink() + else: # set context + fname = Path(common.get_config_dir()) / (choice + '.context') + if ctx: + ctx.rename(fname) else: - to_del.append(name) - for name in to_del: - del groups[name] - utils.write_to_groups_file(groups, 'w') + open(fname, 'w').close() def f_rm(args: argparse.Namespace): """ Unregister repo(s) from gita """ - path_file = utils.get_config_fname('repo_path') + path_file = common.get_config_fname('repo_path') if os.path.isfile(path_file): repos = utils.get_repos() for repo in args.repo: @@ -110,6 +160,9 @@ def f_git_cmd(args: argparse.Namespace): """ repos = utils.get_repos() groups = utils.get_groups() + ctx = utils.get_context() + if not args.repo and ctx: + args.repo = [ctx.stem] if args.repo: # with user specified repo(s) or group(s) chosen = {} for k in args.repo: @@ -170,6 +223,8 @@ def main(argv=None): # bookkeeping sub-commands p_add = subparsers.add_parser('add', help='add repo(s)') p_add.add_argument('paths', nargs='+', help="add repo(s)") + p_add.add_argument('-r', dest='recursive', action='store_true', + help="recursively add repo(s) in the given path.") p_add.set_defaults(func=f_add) p_rm = subparsers.add_parser('rm', help='remove repo(s)') @@ -190,8 +245,38 @@ def main(argv=None): help="new name") p_rename.set_defaults(func=f_rename) - p_info = subparsers.add_parser('info', help='show information items of the ll sub-command') + p_color = subparsers.add_parser('color', + help='display and modify branch coloring of the ll sub-command.') + p_color.set_defaults(func=f_color) + color_cmds = p_color.add_subparsers(dest='color_cmd', + help='additional help with sub-command -h') + color_cmds.add_parser('ll', + description='display available colors and the current branch coloring in the ll sub-command') + pc_set = color_cmds.add_parser('set', + description='Set color for local/remote situation.') + pc_set.add_argument('situation', + choices=info.get_color_encoding(), + help="5 possible local/remote situations") + pc_set.add_argument('color', + choices=[c.name for c in info.Color], + help="available colors") + + p_info = subparsers.add_parser('info', + help='list, add, or remove information items of the ll sub-command.') p_info.set_defaults(func=f_info) + info_cmds = p_info.add_subparsers(dest='info_cmd', + help='additional help with sub-command -h') + info_cmds.add_parser('ll', + description='show used and unused information items of the ll sub-command') + info_cmds.add_parser('add', description='Enable information item.' + ).add_argument('info_item', + choices=('branch', 'commit_msg', 'path'), + help="information item to add") + info_cmds.add_parser('rm', description='Disable information item.' + ).add_argument('info_item', + choices=('branch', 'commit_msg', 'path'), + help="information item to delete") + ll_doc = f''' status symbols: +: staged changes @@ -212,8 +297,19 @@ def main(argv=None): nargs='?', choices=utils.get_groups(), help="show repos in the chosen group") + p_ll.add_argument('-n', '--no-colors', action='store_true', + help='Disable coloring on the branch names.') p_ll.set_defaults(func=f_ll) + p_context = subparsers.add_parser('context', + help='Set and remove context. A context is a group.' + ' When set, all operations apply only to repos in that group.') + p_context.add_argument('choice', + nargs='?', + choices=set().union(utils.get_groups(), {'none'}), + help="Without argument, show current context. Otherwise choose a group as context. To remove context, use 'none'. ") + p_context.set_defaults(func=f_context) + p_ls = subparsers.add_parser( 'ls', help='display names of all repos, or path of a chosen repo') p_ls.add_argument('repo', @@ -223,21 +319,34 @@ def main(argv=None): p_ls.set_defaults(func=f_ls) p_group = subparsers.add_parser( - 'group', help='group repos or display names of all groups if no repo is provided') - p_group.add_argument('to_group', - nargs='*', - choices=utils.get_choices(), - help="repo(s) to be grouped") + 'group', help='list, add, or remove repo group(s)') p_group.set_defaults(func=f_group) - - p_ungroup = subparsers.add_parser( - 'ungroup', help='remove group information for repos', - description="Remove group information on repos") - p_ungroup.add_argument('to_ungroup', - nargs='+', - choices=utils.get_repos(), - help="repo(s) to be ungrouped") - p_ungroup.set_defaults(func=f_ungroup) + group_cmds = p_group.add_subparsers(dest='group_cmd', + help='additional help with sub-command -h') + group_cmds.add_parser('ll', description='List all groups with repos.') + group_cmds.add_parser('ls', description='List all group names.') + pg_add = group_cmds.add_parser('add', description='Add repo(s) to a group.') + pg_add.add_argument('to_group', + nargs='+', + metavar='repo', + choices=utils.get_repos(), + help="repo(s) to be grouped") + pg_add.add_argument('-n', '--name', + dest='gname', + metavar='group-name', + required=True, + help="group name") + pg_rename = group_cmds.add_parser('rename', description='Change group name.') + pg_rename.add_argument('gname', metavar='group-name', + choices=utils.get_groups(), + help="existing group to rename") + pg_rename.add_argument('new_name', metavar='new-name', + help="new group name") + group_cmds.add_parser('rm', + description='Remove group(s).').add_argument('to_ungroup', + nargs='+', + choices=utils.get_groups(), + help="group(s) to delete") # superman mode p_super = subparsers.add_parser( diff --git a/gita/common.py b/gita/common.py index 0a271fc..ef3933d 100644 --- a/gita/common.py +++ b/gita/common.py @@ -6,3 +6,11 @@ def get_config_dir() -> str: os.path.expanduser('~'), '.config') root = os.path.join(parent, "gita") return root + + +def get_config_fname(fname: str) -> str: + """ + Return the file name that stores the repo locations. + """ + root = get_config_dir() + return os.path.join(root, fname) diff --git a/gita/info.py b/gita/info.py index 18d20fd..473127a 100644 --- a/gita/info.py +++ b/gita/info.py @@ -2,14 +2,19 @@ import os import sys import yaml import subprocess +from enum import Enum +from pathlib import Path +from functools import lru_cache from typing import Tuple, List, Callable, Dict + from . import common -class Color: +class Color(str, Enum): """ Terminal color """ + black = '\x1b[30m' red = '\x1b[31m' # local diverges from remote green = '\x1b[32m' # local == remote yellow = '\x1b[33m' # local is behind @@ -18,6 +23,43 @@ class Color: cyan = '\x1b[36m' white = '\x1b[37m' # no remote branch end = '\x1b[0m' + b_black = '\x1b[30;1m' + b_red = '\x1b[31;1m' + b_green = '\x1b[32;1m' + b_yellow = '\x1b[33;1m' + b_blue = '\x1b[34;1m' + b_purple = '\x1b[35;1m' + b_cyan = '\x1b[36;1m' + b_white = '\x1b[37;1m' + + +def show_colors(): # pragma: no cover + """ + + """ + for i, c in enumerate(Color, start=1): + if c != Color.end: + print(f'{c.value}{c.name:<8} ', end='') + if i % 9 == 0: + print() + print(f'{Color.end}') + for situation, c in get_color_encoding().items(): + print(f'{situation:<12}: {c.value}{c.name:<8}{Color.end} ') + + +@lru_cache() +def get_color_encoding(): + """ + + """ + # TODO: add config file + return { + 'no-remote': Color.white, + 'in-sync': Color.green, + 'diverged': Color.red, + 'local-ahead': Color.purple, + 'remote-ahead': Color.yellow, + } def get_info_funcs() -> List[Callable[[str], str]]: @@ -26,35 +68,30 @@ def get_info_funcs() -> List[Callable[[str], str]]: take the repo path as input and return the corresponding information as str. See `get_path`, `get_repo_status`, `get_common_commit` for examples. """ - info_items, to_display = get_info_items() - return [info_items[k] for k in to_display] + to_display = get_info_items() + # This re-definition is to make unit test mocking to work + all_info_items = { + 'branch': get_repo_status, + 'commit_msg': get_commit_msg, + 'path': get_path, + } + return [all_info_items[k] for k in to_display] -def get_info_items() -> Tuple[Dict[str, Callable[[str], str]], List[str]]: +def get_info_items() -> List[str]: """ - Return the available information items for display in the `gita ll` - sub-command, and the ones to be displayed. - It loads custom information functions and configuration if they exist. + Return the information items to be displayed in the `gita ll` command. """ # default settings - info_items = {'branch': get_repo_status, - 'commit_msg': get_commit_msg, - 'path': get_path, } display_items = ['branch', 'commit_msg'] # custom settings - root = common.get_config_dir() - src_fname = os.path.join(root, 'extra_repo_info.py') - yml_fname = os.path.join(root, 'info.yml') - if os.path.isfile(src_fname): - sys.path.append(root) - from extra_repo_info import extra_info_items - info_items.update(extra_info_items) - if os.path.isfile(yml_fname): - with open(yml_fname, 'r') as stream: + yml_config = Path(common.get_config_fname('info.yml')) + if yml_config.is_file(): + with open(yml_config, 'r') as stream: display_items = yaml.load(stream, Loader=yaml.FullLoader) - display_items = [x for x in display_items if x in info_items] - return info_items, display_items + display_items = [x for x in display_items if x in ALL_INFO_ITEMS] + return display_items def get_path(path): @@ -113,13 +150,15 @@ def get_commit_msg(path: str) -> str: return result.stdout.strip() -def get_repo_status(path: str) -> str: +def get_repo_status(path: str, no_colors=False) -> str: head = get_head(path) - dirty, staged, untracked, color = _get_repo_status(path) - return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}' + dirty, staged, untracked, color = _get_repo_status(path, no_colors) + if color: + return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}' + return f'{head+" "+dirty+staged+untracked:<10}' -def _get_repo_status(path: str) -> Tuple[str]: +def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]: """ Return the status of one repo """ @@ -128,19 +167,30 @@ def _get_repo_status(path: str) -> Tuple[str]: staged = '+' if run_quiet_diff(['--cached']) else '' untracked = '_' if has_untracked() else '' + if no_colors: + return dirty, staged, untracked, '' + + colors = get_color_encoding() diff_returncode = run_quiet_diff(['@{u}', '@{0}']) has_no_remote = diff_returncode == 128 has_no_diff = diff_returncode == 0 if has_no_remote: - color = Color.white + color = colors['no-remote'] elif has_no_diff: - color = Color.green + color = colors['in-sync'] else: common_commit = get_common_commit() outdated = run_quiet_diff(['@{u}', common_commit]) if outdated: diverged = run_quiet_diff(['@{0}', common_commit]) - color = Color.red if diverged else Color.yellow + color = colors['diverged'] if diverged else colors['remote-ahead'] else: # local is ahead of remote - color = Color.purple + color = colors['local-ahead'] return dirty, staged, untracked, color + + +ALL_INFO_ITEMS = { + 'branch': get_repo_status, + 'commit_msg': get_commit_msg, + 'path': get_path, + } diff --git a/gita/utils.py b/gita/utils.py index d14484a..d30a82e 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -2,19 +2,23 @@ import os import yaml import asyncio import platform -from functools import lru_cache +from functools import lru_cache, partial +from pathlib import Path from typing import List, Dict, Coroutine, Union from . import info from . import common -def get_config_fname(fname: str) -> str: +@lru_cache() +def get_context() -> Union[Path, None]: """ - Return the file name that stores the repo locations. + Return the context: either a group name or 'none' """ - root = common.get_config_dir() - return os.path.join(root, fname) + config_dir = Path(common.get_config_dir()) + matches = list(config_dir.glob('*.context')) + assert len(matches) < 2, "Cannot have multiple .context file" + return matches[0] if matches else None @lru_cache() @@ -22,7 +26,7 @@ def get_repos() -> Dict[str, str]: """ Return a `dict` of repo name to repo absolute path """ - path_file = get_config_fname('repo_path') + path_file = common.get_config_fname('repo_path') repos = {} # Each line is a repo path and repo name separated by , if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: @@ -47,7 +51,7 @@ def get_groups() -> Dict[str, List[str]]: """ Return a `dict` of group name to repo names. """ - fname = get_config_fname('groups.yml') + fname = common.get_config_fname('groups.yml') groups = {} # Each line is a repo path and repo name separated by , if os.path.isfile(fname) and os.stat(fname).st_size > 0: @@ -102,7 +106,7 @@ def write_to_repo_file(repos: Dict[str, str], mode: str): """ """ data = ''.join(f'{path},{name}\n' for name, path in repos.items()) - fname = get_config_fname('repo_path') + fname = common.get_config_fname('repo_path') os.makedirs(os.path.dirname(fname), exist_ok=True) with open(fname, mode) as f: f.write(data) @@ -112,10 +116,13 @@ def write_to_groups_file(groups: Dict[str, List[str]], mode: str): """ """ - fname = get_config_fname('groups.yml') + fname = common.get_config_fname('groups.yml') os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - yaml.dump(groups, f, default_flow_style=None) + if not groups: # all groups are deleted + open(fname, 'w').close() + else: + with open(fname, mode) as f: + yaml.dump(groups, f, default_flow_style=None) def add_repos(repos: Dict[str, str], new_paths: List[str]): @@ -182,17 +189,23 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: return errors -def describe(repos: Dict[str, str]) -> str: +def describe(repos: Dict[str, str], no_colors: bool=False) -> str: """ Return the status of all repos """ if repos: name_width = max(len(n) for n in repos) + 1 funcs = info.get_info_funcs() + + get_repo_status = info.get_repo_status + if get_repo_status in funcs and no_colors: + idx = funcs.index(get_repo_status) + funcs[idx] = partial(get_repo_status, no_colors=True) + for name in sorted(repos): path = repos[name] - display_items = ' '.join(f(path) for f in funcs) - yield f'{name:<{name_width}}{display_items}' + info_items = ' '.join(f(path) for f in funcs) + yield f'{name:<{name_width}}{info_items}' def get_cmds_from_files() -> Dict[str, Dict[str, str]]: -- cgit v1.2.3