From d6fb6e20014db80336431d38b9ee4d1ab233e905 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 17 Jul 2021 09:26:30 +0200 Subject: Adding upstream version 0.15.1. Signed-off-by: Daniel Baumann --- gita/__main__.py | 235 ++++++++++++++++++++++++++++++++++++++----------- gita/cmds.json | 89 +++++++++++++++++++ gita/cmds.yml | 65 -------------- gita/common.py | 17 ++-- gita/info.py | 103 +++++++++++++--------- gita/utils.py | 261 +++++++++++++++++++++++++++++++++++++++++-------------- 6 files changed, 546 insertions(+), 224 deletions(-) create mode 100644 gita/cmds.json delete mode 100644 gita/cmds.yml (limited to 'gita') diff --git a/gita/__main__.py b/gita/__main__.py index beecab2..ee4e7e7 100644 --- a/gita/__main__.py +++ b/gita/__main__.py @@ -16,22 +16,52 @@ https://github.com/nosarthur/gita/blob/master/.gita-completion.bash import os import sys -import yaml +import csv import argparse import subprocess import pkg_resources from itertools import chain from pathlib import Path +import glob from . import utils, info, common +def _group_name(name: str) -> str: + """ + + """ + repos = utils.get_repos() + if name in repos: + print(f"Cannot use group name {name} since it's a repo name.") + sys.exit(1) + return name + + def f_add(args: argparse.Namespace): repos = utils.get_repos() paths = args.paths - if args.recursive: - paths = chain.from_iterable(Path(p).glob('**') for p in args.paths) - utils.add_repos(repos, paths) + if args.main: + # add to global and tag as main + main_repos = utils.add_repos(repos, paths, repo_type='m') + # add sub-repo recursively and save to local config + for name, prop in main_repos.items(): + main_path = prop['path'] + print('Inside main repo:', name) + #sub_paths = Path(main_path).glob('**') + sub_paths = glob.glob(os.path.join(main_path,'**/'), recursive=True) + utils.add_repos({}, sub_paths, root=main_path) + else: + if args.recursive or args.auto_group: + paths = chain.from_iterable( + glob.glob(os.path.join(p, '**/'), recursive=True) + for p in args.paths) + new_repos = utils.add_repos(repos, paths, is_bare=args.bare) + if args.auto_group: + new_groups = utils.auto_group(new_repos, args.paths) + if new_groups: + print(f'Created {len(new_groups)} new group(s).') + utils.write_to_groups_file(new_groups, 'a+') def f_rename(args: argparse.Namespace): @@ -39,16 +69,32 @@ def f_rename(args: argparse.Namespace): utils.rename_repo(repos, args.repo[0], args.new_name) +def f_flags(args: argparse.Namespace): + cmd = args.flags_cmd or 'll' + repos = utils.get_repos() + if cmd == 'll': + for r, prop in repos.items(): + if prop['flags']: + print(f"{r}: {prop['flags']}") + elif cmd == 'set': + # when in memory, flags are List[str], when on disk, they are space + # delimited str + repos[args.repo]['flags'] = args.flags + utils.write_to_repo_file(repos, 'w') + + def f_color(args: argparse.Namespace): cmd = args.color_cmd or 'll' if cmd == 'll': # pragma: no cover info.show_colors() elif cmd == 'set': colors = info.get_color_encoding() - colors[args.situation] = info.Color[args.color].value - yml_config = common.get_config_fname('color.yml') - with open(yml_config, 'w') as f: - yaml.dump(colors, f, default_flow_style=None) + colors[args.situation] = args.color + csv_config = common.get_config_fname('color.csv') + with open(csv_config, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=colors) + writer.writeheader() + writer.writerow(colors) def f_info(args: argparse.Namespace): @@ -56,37 +102,53 @@ def f_info(args: argparse.Namespace): cmd = args.info_cmd or 'll' if cmd == 'll': print('In use:', ','.join(to_display)) - unused = set(info.ALL_INFO_ITEMS) - set(to_display) + unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display))) if unused: - print('Unused:', ' '.join(unused)) + print('Unused:', ','.join(unused)) return if cmd == 'add' and args.info_item not in to_display: to_display.append(args.info_item) - yml_config = common.get_config_fname('info.yml') - with open(yml_config, 'w') as f: - yaml.dump(to_display, f, default_flow_style=None) + csv_config = common.get_config_fname('info.csv') + with open(csv_config, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(to_display) elif cmd == 'rm' and args.info_item in to_display: to_display.remove(args.info_item) - yml_config = common.get_config_fname('info.yml') - with open(yml_config, 'w') as f: - yaml.dump(to_display, f, default_flow_style=None) + csv_config = common.get_config_fname('info.csv') + with open(csv_config, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(to_display) def f_clone(args: argparse.Namespace): path = Path.cwd() - errors = utils.exec_async_tasks( + if args.preserve_path: + utils.exec_async_tasks( + utils.run_async(repo_name, path, ['git', 'clone', url, abs_path]) + for url, repo_name, abs_path in utils.parse_clone_config(args.fname)) + else: + utils.exec_async_tasks( utils.run_async(repo_name, path, ['git', 'clone', url]) for url, repo_name, _ in utils.parse_clone_config(args.fname)) def f_freeze(_): repos = utils.get_repos() - for name, path in repos.items(): + seen = {''} + for name, prop in repos.items(): + path = prop['path'] + # TODO: What do we do with main repos? Maybe give an option to print + # their sub-repos too. url = '' cp = subprocess.run(['git', 'remote', '-v'], cwd=path, capture_output=True) - if cp.returncode == 0: - url = cp.stdout.decode('utf-8').split('\n')[0].split()[1] - print(f'{url},{name},{path}') + lines = cp.stdout.decode('utf-8').split('\n') + if cp.returncode == 0 and len(lines) > 0: + parts = lines[0].split() + if len(parts)>1: + url = parts[1] + if url not in seen: + seen.add(url) + print(f'{url},{name},{path}') def f_ll(args: argparse.Namespace): @@ -107,7 +169,7 @@ def f_ll(args: argparse.Namespace): def f_ls(args: argparse.Namespace): repos = utils.get_repos() if args.repo: # one repo, show its path - print(repos[args.repo]) + print(repos[args.repo]['path']) else: # show names of all repos print(' '.join(repos)) @@ -128,6 +190,11 @@ def f_group(args: argparse.Namespace): groups[new_name] = groups[gname] del groups[gname] utils.write_to_groups_file(groups, 'w') + # change context + ctx = utils.get_context() + if ctx and str(ctx.stem) == gname: + # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9 + ctx.rename(ctx.with_name(f'{new_name}.context')) elif cmd == 'rm': ctx = utils.get_context() for name in args.to_ungroup: @@ -178,12 +245,22 @@ def f_rm(args: argparse.Namespace): """ Unregister repo(s) from gita """ - path_file = common.get_config_fname('repo_path') + path_file = common.get_config_fname('repos.csv') if os.path.isfile(path_file): repos = utils.get_repos() + main_paths = [prop['path'] for prop in repos.values() if prop['type'] == 'm'] + # TODO: add test case to delete main repo from main repo + # only local setting should be affected instead of the global one for repo in args.repo: del repos[repo] - utils.write_to_repo_file(repos, 'w') + # If cwd is relative to any main repo, write to local config + cwd = os.getcwd() + for p in main_paths: + if utils.is_relative_to(cwd, p): + utils.write_to_repo_file(repos, 'w', p) + break + else: # global config + utils.write_to_repo_file(repos, 'w') def f_git_cmd(args: argparse.Namespace): @@ -205,21 +282,33 @@ def f_git_cmd(args: argparse.Namespace): for r in groups[k]: chosen[r] = repos[r] repos = chosen - cmds = ['git'] + args.cmd - if len(repos) == 1 or cmds[1] in args.async_blacklist: - for path in repos.values(): + per_repo_cmds = [] + for prop in repos.values(): + cmds = args.cmd.copy() + if cmds[0] == 'git' and prop['flags']: + cmds[1:1] = prop['flags'] + per_repo_cmds.append(cmds) + + # This async blacklist mechanism is broken if the git command name does + # not match with the gita command name. + if len(repos) == 1 or args.cmd[1] in args.async_blacklist: + for prop, cmds in zip(repos.values(), per_repo_cmds): + path = prop['path'] print(path) - subprocess.run(cmds, cwd=path) + subprocess.run(cmds, cwd=path, shell=args.shell) else: # run concurrent subprocesses # Async execution cannot deal with multiple repos' user name/password. # Here we shut off any user input in the async execution, and re-run # the failed ones synchronously. errors = utils.exec_async_tasks( - utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items()) + utils.run_async(repo_name, prop['path'], cmds) + for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items())) for path in errors: if path: print(path) - subprocess.run(cmds, cwd=path) + # FIXME: This is broken, flags are missing. But probably few + # people will use `gita flags` + subprocess.run(args.cmd, cwd=path) def f_shell(args): @@ -249,10 +338,10 @@ def f_shell(args): for r in groups[k]: chosen[r] = repos[r] repos = chosen - cmds = args.man[i:] - for name, path in repos.items(): + cmds = ' '.join(args.man[i:]) # join the shell command into a single string + for name, prop in repos.items(): # TODO: pull this out as a function - got = subprocess.run(cmds, cwd=path, check=True, + got = subprocess.run(cmds, cwd=prop['path'], check=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) print(utils.format_output(got.stdout.decode(), name)) @@ -271,8 +360,9 @@ def f_super(args): names.append(word) else: break - args.cmd = args.man[i:] + args.cmd = ['git'] + args.man[i:] args.repo = names + args.shell = False f_git_cmd(args) @@ -292,9 +382,17 @@ def main(argv=None): # bookkeeping sub-commands p_add = subparsers.add_parser('add', description='add repo(s)', help='add repo(s)') - p_add.add_argument('paths', nargs='+', help="repo(s) to add") - p_add.add_argument('-r', dest='recursive', action='store_true', - help="recursively add repo(s) in the given path.") + p_add.add_argument('paths', nargs='+', type=os.path.abspath, help="repo(s) to add") + xgroup = p_add.add_mutually_exclusive_group() + xgroup.add_argument('-r', '--recursive', action='store_true', + help="recursively add repo(s) in the given path(s).") + xgroup.add_argument('-m', '--main', action='store_true', + help="make main repo(s), sub-repos are recursively added.") + xgroup.add_argument('-a', '--auto-group', action='store_true', + help="recursively add repo(s) in the given path(s) " + "and create hierarchical groups based on folder structure.") + xgroup.add_argument('-b', '--bare', action='store_true', + help="add bare repo(s)") p_add.set_defaults(func=f_add) p_rm = subparsers.add_parser('rm', description='remove repo(s)', @@ -305,15 +403,22 @@ def main(argv=None): help="remove the chosen repo(s)") p_rm.set_defaults(func=f_rm) - p_freeze = subparsers.add_parser('freeze', description='print all repo information') + p_freeze = subparsers.add_parser('freeze', + description='print all repo information', + help='print all repo information') p_freeze.set_defaults(func=f_freeze) - p_clone = subparsers.add_parser('clone', description='clone repos from config file') + p_clone = subparsers.add_parser('clone', + description='clone repos from config file', + help='clone repos from config file') p_clone.add_argument('fname', help='config file. Its content should be the output of `gita freeze`.') + p_clone.add_argument('-p', '--preserve-path', dest='preserve_path', action='store_true', + help="clone repo(s) in their original paths") p_clone.set_defaults(func=f_clone) - p_rename = subparsers.add_parser('rename', description='rename a repo') + p_rename = subparsers.add_parser('rename', description='rename a repo', + help='rename a repo') p_rename.add_argument( 'repo', nargs=1, @@ -322,8 +427,25 @@ def main(argv=None): p_rename.add_argument('new_name', help="new name") p_rename.set_defaults(func=f_rename) + p_flags = subparsers.add_parser('flags', + description='Set custom git flags for repo.', + help='git flags configuration') + p_flags.set_defaults(func=f_flags) + flags_cmds = p_flags.add_subparsers(dest='flags_cmd', + help='additional help with sub-command -h') + flags_cmds.add_parser('ll', + description='display repos with custom flags') + pf_set = flags_cmds.add_parser('set', + description='Set flags for repo.') + pf_set.add_argument('repo', choices=utils.get_repos(), + help="repo name") + pf_set.add_argument('flags', + nargs=argparse.REMAINDER, + help="custom flags, use quotes") + p_color = subparsers.add_parser('color', - description='display and modify branch coloring of the ll sub-command.') + description='display and modify branch coloring of the ll sub-command.', + help='color configuration') p_color.set_defaults(func=f_color) color_cmds = p_color.add_subparsers(dest='color_cmd', help='additional help with sub-command -h') @@ -339,7 +461,8 @@ def main(argv=None): help="available colors") p_info = subparsers.add_parser('info', - description='list, add, or remove information items of the ll sub-command.') + description='list, add, or remove information items of the ll sub-command.', + help='information setting') p_info.set_defaults(func=f_info) info_cmds = p_info.add_subparsers(dest='info_cmd', help='additional help with sub-command -h') @@ -347,11 +470,11 @@ def main(argv=None): description='show used and unused information items of the ll sub-command') info_cmds.add_parser('add', description='Enable information item.' ).add_argument('info_item', - choices=('branch', 'commit_msg', 'path'), + choices=info.ALL_INFO_ITEMS, help="information item to add") info_cmds.add_parser('rm', description='Disable information item.' ).add_argument('info_item', - choices=('branch', 'commit_msg', 'path'), + choices=info.ALL_INFO_ITEMS, help="information item to delete") @@ -379,6 +502,7 @@ def main(argv=None): p_ll.set_defaults(func=f_ll) p_context = subparsers.add_parser('context', + help='set context', description='Set and remove context. A context is a group.' ' When set, all operations apply only to repos in that group.') p_context.add_argument('choice', @@ -388,7 +512,8 @@ def main(argv=None): p_context.set_defaults(func=f_context) p_ls = subparsers.add_parser( - 'ls', description='display names of all repos, or path of a chosen repo') + 'ls', help='show repo(s) or repo path', + description='display names of all repos, or path of a chosen repo') p_ls.add_argument('repo', nargs='?', choices=utils.get_repos(), @@ -396,7 +521,8 @@ def main(argv=None): p_ls.set_defaults(func=f_ls) p_group = subparsers.add_parser( - 'group', description='list, add, or remove repo group(s)') + 'group', description='list, add, or remove repo group(s)', + help='group repos') p_group.set_defaults(func=f_group) group_cmds = p_group.add_subparsers(dest='group_cmd', help='additional help with sub-command -h') @@ -410,6 +536,7 @@ def main(argv=None): help="repo(s) to be grouped") pg_add.add_argument('-n', '--name', dest='gname', + type=_group_name, metavar='group-name', required=True, help="group name") @@ -429,6 +556,7 @@ def main(argv=None): choices=utils.get_groups(), help="existing group to rename") pg_rename.add_argument('new_name', metavar='new-name', + type=_group_name, help="new group name") group_cmds.add_parser('rm', description='Remove group(s).').add_argument('to_ungroup', @@ -439,6 +567,7 @@ def main(argv=None): # superman mode p_super = subparsers.add_parser( 'super', + help='run any git command/alias', description='Superman mode: delegate any git command/alias in specified or ' 'all repo(s).\n' 'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n' @@ -446,7 +575,7 @@ def main(argv=None): p_super.add_argument( 'man', nargs=argparse.REMAINDER, - help="execute arbitrary git command/alias for specified or all repos " + help="execute arbitrary git command/alias for specified or all repos\n" "Example: gita super myrepo1 diff --name-only --staged " "Another: gita super checkout master ") p_super.set_defaults(func=f_super) @@ -454,6 +583,7 @@ def main(argv=None): # shell mode p_shell = subparsers.add_parser( 'shell', + help='run any shell command', description='shell mode: delegate any shell command in specified or ' 'all repo(s).\n' 'Examples:\n \t gita shell pwd\n' @@ -470,7 +600,7 @@ def main(argv=None): cmds = utils.get_cmds_from_files() for name, data in cmds.items(): help = data.get('help') - cmd = data.get('cmd') or name + cmd = data['cmd'] if data.get('allow_all'): choices = utils.get_choices() nargs = '*' @@ -481,7 +611,14 @@ def main(argv=None): help += ' for the chosen repo(s) or group(s)' sp = subparsers.add_parser(name, description=help) sp.add_argument('repo', nargs=nargs, choices=choices, help=help) - sp.set_defaults(func=f_git_cmd, cmd=cmd.split()) + is_shell = bool(data.get('shell')) + sp.add_argument('-s', '--shell', default=is_shell, type=bool, + help='If set, run in shell mode') + if is_shell: + cmd = [cmd] + else: + cmd = cmd.split() + sp.set_defaults(func=f_git_cmd, cmd=cmd) args = p.parse_args(argv) diff --git a/gita/cmds.json b/gita/cmds.json new file mode 100644 index 0000000..947a4dd --- /dev/null +++ b/gita/cmds.json @@ -0,0 +1,89 @@ +{ +"br":{ + "cmd": "git branch -vv", + "help":"show local branches"}, +"clean":{ + "cmd": "git clean -dfx", + "help": "remove all untracked files/folders"}, +"diff":{ + "cmd": "git diff", + "help": "git show differences"}, +"difftool":{ + "cmd": "git difftool", + "disable_async": true, + "help": "show differences using a tool" + }, +"fetch":{ + "cmd": "git fetch", + "allow_all": true, + "help": "fetch remote update" + }, +"last":{ + "cmd": "git log -1 HEAD", + "help": "show log information of HEAD" + }, +"log": + {"cmd": "git log", + "disable_async": true, + "help": "show logs" + }, +"merge":{ + "cmd": "git merge @{u}", + "help": "merge remote updates" + }, +"mergetool":{ + "cmd": "git mergetool", + "disable_async": true, + "help": "merge updates with a tool" + }, +"patch":{ + "cmd": "git format-patch HEAD~", + "help": "make a patch" + }, +"pull":{ + "cmd": "git pull", + "allow_all": true, + "help": "pull remote updates" + }, +"push":{ + "cmd": "git push", + "help": "push the local updates" + }, +"rebase":{ + "cmd": "git rebase", + "help": "rebase from master" + }, +"reflog":{ + "cmd": "git reflog", + "help": "show ref logs" + }, +"remote":{ + "cmd": "git remote -v", + "help": "show remote settings" + }, +"reset":{ + "cmd": "git reset", + "help": "reset repo(s)" + }, +"show":{ + "cmd": "git show", + "disable_async": true, + "help": "show detailed commit information" + }, +"stash":{ + "cmd": "git stash", + "help": "store uncommited changes" + }, +"stat":{ + "cmd": "git diff --stat", + "help": "show edit statistics" + }, +"st":{ + "cmd": "git status", + "help": "show status" + }, +"tag":{ + "cmd": "git tag -n", + "help": "show tags" + } +} diff --git a/gita/cmds.yml b/gita/cmds.yml deleted file mode 100644 index 8db932e..0000000 --- a/gita/cmds.yml +++ /dev/null @@ -1,65 +0,0 @@ -br: - cmd: branch -vv - help: show local branches -clean: - cmd: clean -dfx - help: remove all untracked files/folders -diff: - help: show differences -difftool: - disable_async: true - help: show differences using a tool -fetch: - allow_all: true - help: fetch remote update -last: - cmd: log -1 HEAD - help: show log information of HEAD -log: - disable_async: true - help: show logs -merge: - cmd: merge @{u} - help: merge remote updates -mergetool: - disable_async: true - help: merge updates with a tool -patch: - cmd: format-patch HEAD~ - help: make a patch -pull: - allow_all: true - help: pull remote updates -push: - help: push the local updates -rebase: - help: rebase from master -reflog: - help: show ref logs -remote: - cmd: remote -v - help: show remote settings -reset: - help: reset repo(s) -shortlog: - disable_async: true - help: show short log -show: - disable_async: true - help: show detailed commit information -show-branch: - disable_async: true - help: show detailed branch information -stash: - help: store uncommited changes -stat: - cmd: diff --stat - help: show edit statistics -st: - help: show status -tag: - cmd: tag -n - help: show tags -whatchanged: - disable_async: true - help: show detailed log diff --git a/gita/common.py b/gita/common.py index ef3933d..abbef5f 100644 --- a/gita/common.py +++ b/gita/common.py @@ -1,16 +1,17 @@ import os -def get_config_dir() -> str: - parent = os.environ.get('XDG_CONFIG_HOME') or os.path.join( - os.path.expanduser('~'), '.config') - root = os.path.join(parent, "gita") - return root +def get_config_dir(root=None) -> str: + if root is None: + root = os.environ.get('XDG_CONFIG_HOME') or os.path.join( + os.path.expanduser('~'), '.config') + return os.path.join(root, "gita") + else: + return os.path.join(root, ".gita") -def get_config_fname(fname: str) -> str: +def get_config_fname(fname: str, root=None) -> str: """ Return the file name that stores the repo locations. """ - root = get_config_dir() - return os.path.join(root, fname) + return os.path.join(get_config_dir(root), fname) diff --git a/gita/info.py b/gita/info.py index a8044e9..d18a097 100644 --- a/gita/info.py +++ b/gita/info.py @@ -1,5 +1,5 @@ import os -import sys +import csv import yaml import subprocess from enum import Enum @@ -31,40 +31,42 @@ class Color(str, Enum): b_purple = '\x1b[35;1m' b_cyan = '\x1b[36;1m' b_white = '\x1b[37;1m' + underline = '\x1B[4m' def show_colors(): # pragma: no cover """ """ - names = {c.value: c.name for c in Color} for i, c in enumerate(Color, start=1): - if c != Color.end: + if c != Color.end and c != Color.underline: print(f'{c.value}{c.name:<8} ', end='') if i % 9 == 0: print() print(f'{Color.end}') for situation, c in sorted(get_color_encoding().items()): - print(f'{situation:<12}: {c}{names[c]:<8}{Color.end} ') + print(f'{situation:<12}: {Color[c].value}{c:<8}{Color.end} ') @lru_cache() def get_color_encoding() -> Dict[str, str]: """ Return color scheme for different local/remote situations. + In the format of {situation: color name} """ # custom settings - yml_config = Path(common.get_config_fname('color.yml')) - if yml_config.is_file(): - with open(yml_config, 'r') as stream: - colors = yaml.load(stream, Loader=yaml.FullLoader) + csv_config = Path(common.get_config_fname('color.csv')) + if csv_config.is_file(): + with open(csv_config, 'r') as f: + reader = csv.DictReader(f) + colors = next(reader) else: colors = { - 'no-remote': Color.white.value, - 'in-sync': Color.green.value, - 'diverged': Color.red.value, - 'local-ahead': Color.purple.value, - 'remote-ahead': Color.yellow.value, + 'no-remote': Color.white.name, + 'in-sync': Color.green.name, + 'diverged': Color.red.name, + 'local-ahead': Color.purple.name, + 'remote-ahead': Color.yellow.name, } return colors @@ -80,6 +82,7 @@ def get_info_funcs() -> List[Callable[[str], str]]: all_info_items = { 'branch': get_repo_status, 'commit_msg': get_commit_msg, + 'commit_time': get_commit_time, 'path': get_path, } return [all_info_items[k] for k in to_display] @@ -90,23 +93,26 @@ def get_info_items() -> List[str]: Return the information items to be displayed in the `gita ll` command. """ # custom settings - yml_config = Path(common.get_config_fname('info.yml')) - if yml_config.is_file(): - with open(yml_config, 'r') as stream: - display_items = yaml.load(stream, Loader=yaml.FullLoader) + csv_config = Path(common.get_config_fname('info.csv')) + if csv_config.is_file(): + with open(csv_config, 'r') as f: + reader = csv.reader(f) + display_items = next(reader) display_items = [x for x in display_items if x in ALL_INFO_ITEMS] else: # default settings - display_items = ['branch', 'commit_msg'] + display_items = ['branch', 'commit_msg', 'commit_time'] return display_items -def get_path(path): - return f'{Color.cyan}{path}{Color.end}' +def get_path(prop: Dict[str, str]) -> str: + return f'{Color.cyan}{prop["path"]}{Color.end}' +# TODO: do we need to add the flags here too? def get_head(path: str) -> str: - result = subprocess.run('git rev-parse --abbrev-ref HEAD'.split(), + result = subprocess.run('git symbolic-ref -q --short HEAD || git describe --tags --exact-match', + shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True, @@ -114,12 +120,12 @@ def get_head(path: str) -> str: return result.stdout.strip() -def run_quiet_diff(args: List[str]) -> bool: +def run_quiet_diff(flags: List[str], args: List[str]) -> int: """ Return the return code of git diff `args` in quiet mode """ result = subprocess.run( - ['git', 'diff', '--quiet'] + args, + ['git'] + flags + ['diff', '--quiet'] + args, stderr=subprocess.DEVNULL, ) return result.returncode @@ -135,50 +141,68 @@ def get_common_commit() -> str: return result.stdout.strip() -def has_untracked() -> bool: +def has_untracked(flags: List[str]) -> bool: """ Return True if untracked file/folder exists """ - result = subprocess.run('git ls-files -zo --exclude-standard'.split(), + cmd = ['git'] + flags + 'ls-files -zo --exclude-standard'.split() + result = subprocess.run(cmd, stdout=subprocess.PIPE) return bool(result.stdout) -def get_commit_msg(path: str) -> str: +def get_commit_msg(prop: Dict[str, str]) -> str: """ Return the last commit message. """ # `git show-branch --no-name HEAD` is faster than `git show -s --format=%s` - result = subprocess.run('git show-branch --no-name HEAD'.split(), + cmd = ['git'] + prop['flags'] + 'show-branch --no-name HEAD'.split() + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True, - cwd=path) + cwd=prop['path']) return result.stdout.strip() -def get_repo_status(path: str, no_colors=False) -> str: - head = get_head(path) - dirty, staged, untracked, color = _get_repo_status(path, no_colors) +def get_commit_time(prop: Dict[str, str]) -> str: + """ + Return the last commit time in parenthesis. + """ + cmd = ['git'] + prop['flags'] + 'log -1 --format=%cd --date=relative'.split() + result = subprocess.run(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True, + cwd=prop['path']) + return f"({result.stdout.strip()})" + + +def get_repo_status(prop: Dict[str, str], no_colors=False) -> str: + head = get_head(prop['path']) + dirty, staged, untracked, color = _get_repo_status(prop, no_colors) if color: return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}' return f'{head+" "+dirty+staged+untracked:<10}' -def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]: +def _get_repo_status(prop: Dict[str, str], no_colors: bool) -> Tuple[str]: """ Return the status of one repo """ + path = prop['path'] + flags = prop['flags'] os.chdir(path) - dirty = '*' if run_quiet_diff([]) else '' - staged = '+' if run_quiet_diff(['--cached']) else '' - untracked = '_' if has_untracked() else '' + dirty = '*' if run_quiet_diff(flags, []) else '' + staged = '+' if run_quiet_diff(flags, ['--cached']) else '' + untracked = '_' if has_untracked(flags) else '' if no_colors: return dirty, staged, untracked, '' - colors = get_color_encoding() - diff_returncode = run_quiet_diff(['@{u}', '@{0}']) + colors = {situ: Color[name].value + for situ, name in get_color_encoding().items()} + diff_returncode = run_quiet_diff(flags, ['@{u}', '@{0}']) has_no_remote = diff_returncode == 128 has_no_diff = diff_returncode == 0 if has_no_remote: @@ -187,9 +211,9 @@ def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]: color = colors['in-sync'] else: common_commit = get_common_commit() - outdated = run_quiet_diff(['@{u}', common_commit]) + outdated = run_quiet_diff(flags, ['@{u}', common_commit]) if outdated: - diverged = run_quiet_diff(['@{0}', common_commit]) + diverged = run_quiet_diff(flags, ['@{0}', common_commit]) color = colors['diverged'] if diverged else colors['remote-ahead'] else: # local is ahead of remote color = colors['local-ahead'] @@ -199,5 +223,6 @@ def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]: ALL_INFO_ITEMS = { 'branch': get_repo_status, 'commit_msg': get_commit_msg, + 'commit_time': get_commit_time, 'path': get_path, } diff --git a/gita/utils.py b/gita/utils.py index 9572f02..34fc435 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -1,62 +1,76 @@ import os -import yaml +import json +import csv import asyncio import platform +import subprocess from functools import lru_cache, partial from pathlib import Path -from typing import List, Dict, Coroutine, Union, Iterator +from typing import List, Dict, Coroutine, Union, Iterator, Tuple +from collections import Counter, defaultdict from . import info from . import common -@lru_cache() -def get_context() -> Union[Path, None]: +# TODO: python3.9 pathlib has is_relative_to() function +def is_relative_to(kid: str, parent: str) -> bool: """ - Return the context: either a group name or 'none' + Both the `kid` and `parent` should be absolute path """ - config_dir = Path(common.get_config_dir()) - matches = list(config_dir.glob('*.context')) - assert len(matches) < 2, "Cannot have multiple .context file" - return matches[0] if matches else None + return parent == os.path.commonpath((kid, parent)) @lru_cache() -def get_repos() -> Dict[str, str]: +def get_repos(root=None) -> Dict[str, Dict[str, str]]: """ - Return a `dict` of repo name to repo absolute path + Return a `dict` of repo name to repo absolute path and repo type + + @param root: Use local config if set. If None, use either global or local + config depending on cwd. """ - path_file = common.get_config_fname('repo_path') + path_file = common.get_config_fname('repos.csv', root) repos = {} - # Each line is a repo path and repo name separated by , if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: with open(path_file) as f: - for line in f: - line = line.rstrip() - if not line: # blank line - continue - path, name = line.split(',') - if not is_git(path): - continue - if name not in repos: - repos[name] = path - else: # repo name collision for different paths: include parent path name - par_name = os.path.basename(os.path.dirname(path)) - repos[os.path.join(par_name, name)] = path + rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'], + restval='') # it's actually a reader + repos = {r['name']: + {'path': r['path'], 'type': r['type'], + 'flags': r['flags'].split()} + for r in rows if is_git(r['path'], is_bare=True)} + if root is None: # detect if inside a main path + cwd = os.getcwd() + for prop in repos.values(): + path = prop['path'] + if prop['type'] == 'm' and is_relative_to(cwd, path): + return get_repos(path) return repos +@lru_cache() +def get_context() -> Union[Path, None]: + """ + Return the context: either a group name or 'none' + """ + config_dir = Path(common.get_config_dir()) + matches = list(config_dir.glob('*.context')) + assert len(matches) < 2, "Cannot have multiple .context file" + return matches[0] if matches else None + + @lru_cache() def get_groups() -> Dict[str, List[str]]: """ Return a `dict` of group name to repo names. """ - fname = common.get_config_fname('groups.yml') + fname = common.get_config_fname('groups.csv') groups = {} # Each line is a repo path and repo name separated by , if os.path.isfile(fname) and os.stat(fname).st_size > 0: with open(fname, 'r') as f: - groups = yaml.load(f, Loader=yaml.FullLoader) + rows = csv.reader(f, delimiter=':') + groups = {r[0]: r[1].split() for r in rows} return groups @@ -75,10 +89,12 @@ def get_choices() -> List[Union[str, None]]: return choices -def is_git(path: str) -> bool: +def is_git(path: str, is_bare=False) -> bool: """ Return True if the path is a git repo. """ + if not os.path.exists(path): + return False # An alternative is to call `git rev-parse --is-inside-work-tree` # I don't see why that one is better yet. # For a regular git repo, .git is a folder, for a worktree repo, .git is a file. @@ -88,59 +104,172 @@ def is_git(path: str) -> bool: # `git rev-parse --git-common-dir` loc = os.path.join(path, '.git') # TODO: we can display the worktree repos in a different font. - return os.path.exists(loc) - - -def rename_repo(repos: Dict[str, str], repo: str, new_name: str): + if os.path.exists(loc): + return True + if not is_bare: + return False + # detect bare repo + got = subprocess.run('git rev-parse --is-bare-repository'.split(), + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, + cwd=path + ) + if got.returncode == 0 and got.stdout == b'true\n': + return True + return False + +def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str): """ Write new repo name to file """ - path = repos[repo] + if new_name in repos: + print(f"{new_name} is already in use!") + return + prop = repos[repo] del repos[repo] - repos[new_name] = path - write_to_repo_file(repos, 'w') - - -def write_to_repo_file(repos: Dict[str, str], mode: str): + repos[new_name] = prop + # write to local config if inside a main path + main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm') + cwd = os.getcwd() + is_local_config = True + for p in main_paths: + if is_relative_to(cwd, p): + write_to_repo_file(repos, 'w', p) + break + else: # global config + write_to_repo_file(repos, 'w') + is_local_config = False + # update groups only when outside any main repos + if is_local_config: + return + groups = get_groups() + for g, members in groups.items(): + if repo in members: + members.remove(repo) + members.append(new_name) + groups[g] = sorted(members) + write_to_groups_file(groups, 'w') + + +def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None): """ + @param repos: each repo is {name: {properties}} """ - data = ''.join(f'{path},{name}\n' for name, path in repos.items()) - fname = common.get_config_fname('repo_path') + data = [(prop['path'], name, prop['type'], ' '.join(prop['flags'])) + for name, prop in repos.items()] + fname = common.get_config_fname('repos.csv', root) os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - f.write(data) + with open(fname, mode, newline='') as f: + writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerows(data) def write_to_groups_file(groups: Dict[str, List[str]], mode: str): """ """ - fname = common.get_config_fname('groups.yml') + fname = common.get_config_fname('groups.csv') os.makedirs(os.path.dirname(fname), exist_ok=True) if not groups: # all groups are deleted open(fname, 'w').close() else: - with open(fname, mode) as f: - yaml.dump(groups, f, default_flow_style=None) + with open(fname, mode, newline='') as f: + data = [ + (group, ' '.join(repos)) + for group, repos in groups.items() + ] + writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerows(data) + + +def _make_name(path: str, repos: Dict[str, Dict[str, str]], + name_counts: Counter) -> str: + """ + Given a new repo `path`, create a repo name. By default, basename is used. + If name collision exists, further include parent path name. + + @param path: It should not be in `repos` and is absolute + """ + name = os.path.basename(os.path.normpath(path)) + if name in repos or name_counts[name] > 1: + par_name = os.path.basename(os.path.dirname(path)) + return os.path.join(par_name, name) + return name -def add_repos(repos: Dict[str, str], new_paths: List[str]): +def _get_repo_type(path, repo_type, root) -> str: """ - Write new repo paths to file + + """ + if repo_type != '': # explicitly set + return repo_type + if root is not None and os.path.normpath(root) == os.path.normpath(path): + return 'm' + return '' + + +def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str], + repo_type='', root=None, is_bare=False) -> Dict[str, Dict[str, str]]: + """ + Write new repo paths to file; return the added repos. @param repos: name -> path """ - existing_paths = set(repos.values()) - new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p)) + existing_paths = {prop['path'] for prop in repos.values()} + new_paths = {p for p in new_paths if is_git(p, is_bare)} new_paths = new_paths - existing_paths + new_repos = {} if new_paths: print(f"Found {len(new_paths)} new repo(s).") - new_repos = { - os.path.basename(os.path.normpath(path)): path - for path in new_paths} - write_to_repo_file(new_repos, 'a+') + name_counts = Counter( + os.path.basename(os.path.normpath(p)) for p in new_paths + ) + new_repos = {_make_name(path, repos, name_counts): { + 'path': path, + 'type': _get_repo_type(path, repo_type, root), + 'flags': '', + } for path in new_paths} + # When root is not None, we could optionally set its type to 'm', i.e., + # main repo. + write_to_repo_file(new_repos, 'a+', root) else: print('No new repos found!') + return new_repos + + +def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]: + """ + Return relative parent strings + + For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/ + then return (b, c, d) + """ + for p in paths: + if is_relative_to(repo_path, p): + break + else: + return () + return (os.path.basename(p), + *os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1]) + + +def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str] + ) -> Dict[str, List[str]]: + """ + + """ + # FIXME: the upstream code should make sure that paths are all independent + # i.e., each repo should be contained in one and only one path + new_groups = defaultdict(list) + for repo_name, prop in repos.items(): + hash = _generate_dir_hash(prop['path'], paths) + if not hash: + continue + for i in range(1, len(hash)+1): + group_name = '-'.join(hash[:i]) + new_groups[group_name].append(repo_name) + # FIXME: need to make sure the new group names don't clash with old ones + # or repo names + return new_groups def parse_clone_config(fname: str) -> Iterator[List[str]]: @@ -157,6 +286,7 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s Run `cmds` asynchronously in `path` directory. Return the `path` if execution fails. """ + # TODO: deprecated since 3.8, will be removed in 3.10 process = await asyncio.create_subprocess_exec( *cmds, stdin=asyncio.subprocess.DEVNULL, @@ -199,7 +329,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: return errors -def describe(repos: Dict[str, str], no_colors: bool=False) -> str: +def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str: """ Return the status of all repos """ @@ -213,9 +343,14 @@ def describe(repos: Dict[str, str], no_colors: bool=False) -> str: funcs[idx] = partial(get_repo_status, no_colors=True) for name in sorted(repos): - path = repos[name] - info_items = ' '.join(f(path) for f in funcs) - yield f'{name:<{name_width}}{info_items}' + info_items = ' '.join(f(repos[name]) for f in funcs) + if repos[name]['type'] == 'm': + # ANSI color code also takes length in Python + name = f'{info.Color.underline}{name}{info.Color.end}' + width = name_width + 8 + yield f'{name:<{width}}{info_items}' + else: + yield f'{name:<{name_width}}{info_items}' def get_cmds_from_files() -> Dict[str, Dict[str, str]]: @@ -231,17 +366,17 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: } """ # default config file - fname = os.path.join(os.path.dirname(__file__), "cmds.yml") - with open(fname, 'r') as stream: - cmds = yaml.load(stream, Loader=yaml.FullLoader) + fname = os.path.join(os.path.dirname(__file__), "cmds.json") + with open(fname, 'r') as f: + cmds = json.load(f) # custom config file root = common.get_config_dir() - fname = os.path.join(root, 'cmds.yml') + fname = os.path.join(root, 'cmds.json') custom_cmds = {} if os.path.isfile(fname) and os.path.getsize(fname): - with open(fname, 'r') as stream: - custom_cmds = yaml.load(stream, Loader=yaml.FullLoader) + with open(fname, 'r') as f: + custom_cmds = json.load(f) # custom commands shadow default ones cmds.update(custom_cmds) -- cgit v1.2.3