diff options
Diffstat (limited to 'gita')
-rw-r--r-- | gita/__init__.py | 2 | ||||
-rw-r--r-- | gita/__main__.py | 825 | ||||
-rw-r--r-- | gita/cmds.json | 90 | ||||
-rw-r--r-- | gita/cmds.yml | 65 | ||||
-rw-r--r-- | gita/common.py | 17 | ||||
-rw-r--r-- | gita/info.py | 293 | ||||
-rw-r--r-- | gita/utils.py | 440 |
7 files changed, 1358 insertions, 374 deletions
diff --git a/gita/__init__.py b/gita/__init__.py index eeb79a3..33c0f41 100644 --- a/gita/__init__.py +++ b/gita/__init__.py @@ -1,3 +1,3 @@ import pkg_resources -__version__ = pkg_resources.get_distribution('gita').version +__version__ = pkg_resources.get_distribution("gita").version diff --git a/gita/__main__.py b/gita/__main__.py index 6e47f8e..b2bc32b 100644 --- a/gita/__main__.py +++ b/gita/__main__.py @@ -1,4 +1,4 @@ -''' +""" Gita manages multiple git repos. It has two functionalities 1. display the status of multiple repos side by side @@ -12,19 +12,82 @@ Examples: For bash auto completion, download and source https://github.com/nosarthur/gita/blob/master/.gita-completion.bash -''' +""" import os +import sys +import csv import argparse import subprocess +from functools import partial import pkg_resources +from itertools import chain +from pathlib import Path +import glob -from . import utils, info +from . import utils, info, common + + +def _group_name(name: str, exclude_old_names=True) -> str: + """ + Return valid group name + """ + repos = utils.get_repos() + if name in repos: + print(f"Cannot use group name {name} since it's a repo name.") + sys.exit(1) + if exclude_old_names: + if name in utils.get_groups(): + print(f"Cannot use group name {name} since it's already in use.") + sys.exit(1) + if name in {"none", "auto"}: + print(f"Cannot use group name {name} since it's a reserved keyword.") + sys.exit(1) + return name + + +def _path_name(name: str) -> str: + """ + Return absolute path + """ + if name: + return os.path.abspath(name) + return "" def f_add(args: argparse.Namespace): repos = utils.get_repos() - utils.add_repos(repos, args.paths) + paths = args.paths + dry_run = args.dry_run + groups = utils.get_groups() + if args.recursive or args.auto_group: + paths = ( + p.rstrip(os.path.sep) + for p in chain.from_iterable( + glob.glob(os.path.join(p, "**/"), recursive=True) for p in args.paths + ) + ) + new_repos = utils.add_repos( + repos, + paths, + include_bare=args.bare, + exclude_submodule=args.skip_submodule, + dry_run=dry_run, + ) + if dry_run: + return + if new_repos and args.auto_group: + new_groups = utils.auto_group(new_repos, args.paths) + if new_groups: + print(f"Created {len(new_groups)} new group(s).") + utils.write_to_groups_file(new_groups, "a+") + if new_repos and args.group: + gname = args.group + gname_repos = set(groups[gname]["repos"]) + gname_repos.update(new_repos) + groups[gname]["repos"] = sorted(gname_repos) + print(f"Added {len(new_repos)} repos to the {gname} group") + utils.write_to_groups_file(groups, "w") def f_rename(args: argparse.Namespace): @@ -32,12 +95,123 @@ def f_rename(args: argparse.Namespace): utils.rename_repo(repos, args.repo[0], args.new_name) -def f_info(_): - all_items, to_display = info.get_info_items() - print('In use:', ','.join(to_display)) - unused = set(all_items) - set(to_display) - if unused: - print('Unused:', ' '.join(unused)) +def f_flags(args: argparse.Namespace): + cmd = args.flags_cmd or "ll" + repos = utils.get_repos() + if cmd == "ll": + for r, prop in repos.items(): + if prop["flags"]: + print(f"{r}: {prop['flags']}") + elif cmd == "set": + # when in memory, flags are List[str], when on disk, they are space + # delimited str + repos[args.repo]["flags"] = args.flags + utils.write_to_repo_file(repos, "w") + + +def f_color(args: argparse.Namespace): + cmd = args.color_cmd or "ll" + if cmd == "ll": # pragma: no cover + info.show_colors() + elif cmd == "set": + colors = info.get_color_encoding() + colors[args.situation] = args.color + csv_config = common.get_config_fname("color.csv") + with open(csv_config, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=colors) + writer.writeheader() + writer.writerow(colors) + elif cmd == "reset": + Path(common.get_config_fname("color.csv")).unlink(missing_ok=True) + + +def f_info(args: argparse.Namespace): + to_display = info.get_info_items() + cmd = args.info_cmd or "ll" + if cmd == "ll": + print("In use:", ",".join(to_display)) + unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display))) + if unused: + print("Unused:", ",".join(unused)) + return + if cmd == "add" and args.info_item not in to_display: + to_display.append(args.info_item) + csv_config = common.get_config_fname("info.csv") + with open(csv_config, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(to_display) + elif cmd == "rm" and args.info_item in to_display: + to_display.remove(args.info_item) + csv_config = common.get_config_fname("info.csv") + with open(csv_config, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(to_display) + + +def f_clone(args: argparse.Namespace): + + if args.dry_run: + if args.from_file: + for url, repo_name, abs_path in utils.parse_clone_config(args.clonee): + print(f"git clone {url} {abs_path}") + else: + print(f"git clone {args.clonee}") + return + + if args.directory: + path = args.directory + else: + path = Path.cwd() + + if not args.from_file: + subprocess.run(["git", "clone", args.clonee], cwd=path) + # add the cloned repo to gita; group is also supported + cloned_path = os.path.join(path, args.clonee.split("/")[-1].split(".")[0]) + args.paths = [cloned_path] + args.recursive = args.auto_group = args.bare = args.skip_submodule = False + f_add(args) + return + + if args.preserve_path: + utils.exec_async_tasks( + utils.run_async(repo_name, path, ["git", "clone", url, abs_path]) + for url, repo_name, abs_path in utils.parse_clone_config(args.clonee) + ) + else: + utils.exec_async_tasks( + utils.run_async(repo_name, path, ["git", "clone", url]) + for url, repo_name, _ in utils.parse_clone_config(args.clonee) + ) + + +def f_freeze(args): + repos = utils.get_repos() + ctx = utils.get_context() + if args.group is None and ctx: + args.group = ctx.stem + group_repos = None + if args.group: # only display repos in this group + group_repos = utils.get_groups()[args.group]["repos"] + repos = {k: repos[k] for k in group_repos if k in repos} + seen = {""} + for name, prop in repos.items(): + path = prop["path"] + url = "" + # FIXME: capture_output is new in 3.7. Maybe drop support for 3.6 + cp = subprocess.run( + ["git", "remote", "-v"], + cwd=path, + universal_newlines=True, + capture_output=True, + ) + lines = cp.stdout.split("\n") + if cp.returncode == 0 and len(lines) > 0: + parts = lines[0].split() + if len(parts) > 1: + url = parts[1] + if url not in seen: + seen.add(url) + print(f"{url},{name},{path}") def f_ll(args: argparse.Namespace): @@ -45,62 +219,128 @@ def f_ll(args: argparse.Namespace): Display details of all repos """ repos = utils.get_repos() + ctx = utils.get_context() + if args.group is None and ctx: + args.group = ctx.stem + group_repos = None if args.group: # only display repos in this group - group_repos = utils.get_groups()[args.group] + group_repos = utils.get_groups()[args.group]["repos"] repos = {k: repos[k] for k in group_repos if k in repos} - for line in utils.describe(repos): - print(line) + if args.g: # display by group + if group_repos: + print(f"{args.group}:") + for line in utils.describe(repos, no_colors=args.no_colors): + print(" ", line) + else: + for g, prop in utils.get_groups().items(): + print(f"{g}:") + g_repos = {k: repos[k] for k in prop["repos"] if k in repos} + for line in utils.describe(g_repos, no_colors=args.no_colors): + print(" ", line) + else: + for line in utils.describe(repos, no_colors=args.no_colors): + print(line) def f_ls(args: argparse.Namespace): repos = utils.get_repos() if args.repo: # one repo, show its path - print(repos[args.repo]) + print(repos[args.repo]["path"]) else: # show names of all repos - print(' '.join(repos)) + print(" ".join(repos)) def f_group(args: argparse.Namespace): groups = utils.get_groups() - if args.to_group: - gname = input('group name? ') + cmd = args.group_cmd or "ll" + if cmd == "ll": + if "to_show" in args and args.to_show: + gname = args.to_show + print(" ".join(groups[gname]["repos"])) + else: + for group, prop in groups.items(): + print(f"{info.Color.underline}{group}{info.Color.end}: {prop['path']}") + for r in prop["repos"]: + print(" -", r) + elif cmd == "ls": + print(" ".join(groups)) + elif cmd == "rename": + new_name = args.new_name + gname = args.gname + groups[new_name] = groups[gname] + del groups[gname] + utils.write_to_groups_file(groups, "w") + # change context + ctx = utils.get_context() + if ctx and ctx.stem == gname: + utils.replace_context(ctx, new_name) + elif cmd == "rm": + ctx = utils.get_context() + for name in args.to_ungroup: + del groups[name] + if ctx and str(ctx.stem) == name: + utils.replace_context(ctx, "") + utils.write_to_groups_file(groups, "w") + elif cmd == "add": + gname = args.gname if gname in groups: - gname_repos = set(groups[gname]) + gname_repos = set(groups[gname]["repos"]) gname_repos.update(args.to_group) - groups[gname] = sorted(gname_repos) - utils.write_to_groups_file(groups, 'w') + groups[gname]["repos"] = sorted(gname_repos) + if "gpath" in args: + groups[gname]["path"] = args.gpath + utils.write_to_groups_file(groups, "w") else: - utils.write_to_groups_file({gname: sorted(args.to_group)}, 'a+') - else: - for group, repos in groups.items(): - print(f"{group}: {' '.join(repos)}") - - -def f_ungroup(args: argparse.Namespace): - groups = utils.get_groups() - to_ungroup = set(args.to_ungroup) - to_del = [] - for name, repos in groups.items(): - remaining = set(repos) - to_ungroup - if remaining: - groups[name] = list(sorted(remaining)) + gpath = "" + if "gpath" in args: + gpath = args.gpath + utils.write_to_groups_file( + {gname: {"repos": sorted(args.to_group), "path": gpath}}, "a+" + ) + elif cmd == "rmrepo": + gname = args.gname + if gname in groups: + group = { + gname: {"repos": groups[gname]["repos"], "path": groups[gname]["path"]} + } + for repo in args.to_rm: + utils.delete_repo_from_groups(repo, group) + groups[gname] = group[gname] + utils.write_to_groups_file(groups, "w") + + +def f_context(args: argparse.Namespace): + choice = args.choice + ctx = utils.get_context() + if choice is None: # display current context + if ctx: + group = ctx.stem + print(f"{group}: {' '.join(utils.get_groups()[group]['repos'])}") + elif (Path(common.get_config_dir()) / "auto.context").exists(): + print("auto: none detected!") else: - to_del.append(name) - for name in to_del: - del groups[name] - utils.write_to_groups_file(groups, 'w') + print("Context is not set") + else: # set context + utils.replace_context(ctx, choice) def f_rm(args: argparse.Namespace): """ Unregister repo(s) from gita """ - path_file = utils.get_config_fname('repo_path') + path_file = common.get_config_fname("repos.csv") if os.path.isfile(path_file): repos = utils.get_repos() + group_updated = False + groups = utils.get_groups() for repo in args.repo: del repos[repo] - utils.write_to_repo_file(repos, 'w') + up = utils.delete_repo_from_groups(repo, groups) + group_updated = group_updated or up + if group_updated: + utils.write_to_groups_file(groups, "w") + + utils.write_to_repo_file(repos, "w") def f_git_cmd(args: argparse.Namespace): @@ -108,32 +348,62 @@ def f_git_cmd(args: argparse.Namespace): Delegate git command/alias defined in `args.cmd`. Asynchronous execution is disabled for commands in the `args.async_blacklist`. """ - repos = utils.get_repos() - groups = utils.get_groups() - if args.repo: # with user specified repo(s) or group(s) - chosen = {} - for k in args.repo: - if k in repos: - chosen[k] = repos[k] - if k in groups: - for r in groups[k]: - chosen[r] = repos[r] - repos = chosen - cmds = ['git'] + args.cmd - if len(repos) == 1 or cmds[1] in args.async_blacklist: - for path in repos.values(): + if "_parsed_repos" in args: + repos = args._parsed_repos + else: + repos, _ = utils.parse_repos_and_rest(args.repo) + + per_repo_cmds = [] + for prop in repos.values(): + cmds = args.cmd.copy() + if cmds[0] == "git" and prop["flags"]: + cmds[1:1] = prop["flags"] + per_repo_cmds.append(cmds) + + # This async blacklist mechanism is broken if the git command name does + # not match with the gita command name. + if len(repos) == 1 or args.cmd[1] in args.async_blacklist: + for prop, cmds in zip(repos.values(), per_repo_cmds): + path = prop["path"] print(path) - subprocess.run(cmds, cwd=path) + subprocess.run(cmds, cwd=path, shell=args.shell) else: # run concurrent subprocesses # Async execution cannot deal with multiple repos' user name/password. # Here we shut off any user input in the async execution, and re-run # the failed ones synchronously. errors = utils.exec_async_tasks( - utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items()) + utils.run_async(repo_name, prop["path"], cmds) + for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items()) + ) for path in errors: if path: print(path) - subprocess.run(cmds, cwd=path) + # FIXME: This is broken, flags are missing. But probably few + # people will use `gita flags` + subprocess.run(args.cmd, cwd=path) + + +def f_shell(args): + """ + Delegate shell command defined in `args.man`, which may or may not + contain repo names. + """ + repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode) + if not cmds: + print("Missing commands") + sys.exit(2) + + cmds = " ".join(cmds) # join the shell command into a single string + for name, prop in repos.items(): + # TODO: pull this out as a function + got = subprocess.run( + cmds, + cwd=prop["path"], + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + print(utils.format_output(got.stdout.decode(), name)) def f_super(args): @@ -141,59 +411,195 @@ def f_super(args): Delegate git command/alias defined in `args.man`, which may or may not contain repo names. """ - names = [] - repos = utils.get_repos() - groups = utils.get_groups() - for i, word in enumerate(args.man): - if word in repos or word in groups: - names.append(word) - else: - break - args.cmd = args.man[i:] - args.repo = names + repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode) + if not cmds: + print("Missing commands") + sys.exit(2) + + args.cmd = ["git"] + cmds + args._parsed_repos = repos + args.shell = False f_git_cmd(args) +def f_clear(_): + utils.write_to_groups_file({}, "w") + utils.write_to_repo_file({}, "w") + + def main(argv=None): - p = argparse.ArgumentParser(prog='gita', - formatter_class=argparse.RawTextHelpFormatter, - description=__doc__) - subparsers = p.add_subparsers(title='sub-commands', - help='additional help with sub-command -h') - - version = pkg_resources.require('gita')[0].version - p.add_argument('-v', - '--version', - action='version', - version=f'%(prog)s {version}') + p = argparse.ArgumentParser( + prog="gita", formatter_class=argparse.RawTextHelpFormatter, description=__doc__ + ) + subparsers = p.add_subparsers( + title="sub-commands", help="additional help with sub-command -h" + ) + + version = pkg_resources.require("gita")[0].version + p.add_argument("-v", "--version", action="version", version=f"%(prog)s {version}") # bookkeeping sub-commands - p_add = subparsers.add_parser('add', help='add repo(s)') - p_add.add_argument('paths', nargs='+', help="add repo(s)") + p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)") + p_add.add_argument("paths", nargs="+", type=_path_name, help="repo(s) to add") + p_add.add_argument("-n", "--dry-run", action="store_true", help="dry run") + p_add.add_argument( + "-g", + "--group", + choices=utils.get_groups(), + help="add repo(s) to the specified group", + ) + p_add.add_argument( + "-s", "--skip-submodule", action="store_true", help="skip submodule repo(s)" + ) + xgroup = p_add.add_mutually_exclusive_group() + xgroup.add_argument( + "-r", + "--recursive", + action="store_true", + help="recursively add repo(s) in the given path(s).", + ) + xgroup.add_argument( + "-a", + "--auto-group", + action="store_true", + help="recursively add repo(s) in the given path(s) " + "and create hierarchical groups based on folder structure.", + ) + xgroup.add_argument("-b", "--bare", action="store_true", help="add bare repo(s)") p_add.set_defaults(func=f_add) - p_rm = subparsers.add_parser('rm', help='remove repo(s)') - p_rm.add_argument('repo', - nargs='+', - choices=utils.get_repos(), - help="remove the chosen repo(s)") + p_rm = subparsers.add_parser( + "rm", description="remove repo(s)", help="remove repo(s)" + ) + p_rm.add_argument( + "repo", nargs="+", choices=utils.get_repos(), help="remove the chosen repo(s)" + ) p_rm.set_defaults(func=f_rm) - p_rename = subparsers.add_parser('rename', help='rename a repo') - p_rename.add_argument( - 'repo', - nargs=1, - choices=utils.get_repos(), - help="rename the chosen repo") + p_freeze = subparsers.add_parser( + "freeze", + description="print all repo information", + help="print all repo information", + ) + p_freeze.add_argument( + "-g", + "--group", + choices=utils.get_groups(), + help="freeze repos in the specified group", + ) + p_freeze.set_defaults(func=f_freeze) + + p_clone = subparsers.add_parser( + "clone", description="clone repos", help="clone repos" + ) + p_clone.add_argument( + "clonee", + help="A URL or a config file.", + ) + p_clone.add_argument( + "-C", + "--directory", + help="Change to DIRECTORY before doing anything.", + ) + p_clone.add_argument( + "-p", + "--preserve-path", + dest="preserve_path", + action="store_true", + help="clone repo(s) in their original paths", + ) + p_clone.add_argument( + "-n", + "--dry-run", + action="store_true", + help="If set, show command without execution", + ) + xgroup = p_clone.add_mutually_exclusive_group() + xgroup.add_argument( + "-g", + "--group", + choices=utils.get_groups(), + help="If set, add repo to the specified group after cloning, otherwise add to gita without group.", + ) + xgroup.add_argument( + "-f", + "--from-file", + action="store_true", + help="If set, clone repos in a config file rendered from `gita freeze`", + ) + p_clone.set_defaults(func=f_clone) + + p_rename = subparsers.add_parser( + "rename", description="rename a repo", help="rename a repo" + ) p_rename.add_argument( - 'new_name', - help="new name") + "repo", nargs=1, choices=utils.get_repos(), help="rename the chosen repo" + ) + p_rename.add_argument("new_name", help="new name") p_rename.set_defaults(func=f_rename) - p_info = subparsers.add_parser('info', help='show information items of the ll sub-command') + p_flags = subparsers.add_parser( + "flags", + description="Set custom git flags for repo.", + help="git flags configuration", + ) + p_flags.set_defaults(func=f_flags) + flags_cmds = p_flags.add_subparsers( + dest="flags_cmd", help="additional help with sub-command -h" + ) + flags_cmds.add_parser("ll", description="display repos with custom flags") + pf_set = flags_cmds.add_parser("set", description="Set flags for repo.") + pf_set.add_argument("repo", choices=utils.get_repos(), help="repo name") + pf_set.add_argument( + "flags", nargs=argparse.REMAINDER, help="custom flags, use quotes" + ) + + p_color = subparsers.add_parser( + "color", + description="display and modify branch coloring of the ll sub-command.", + help="color configuration", + ) + p_color.set_defaults(func=f_color) + color_cmds = p_color.add_subparsers( + dest="color_cmd", help="additional help with sub-command -h" + ) + color_cmds.add_parser( + "ll", + description="display available colors and the current branch coloring in the ll sub-command", + ) + color_cmds.add_parser("reset", description="reset color scheme.") + pc_set = color_cmds.add_parser( + "set", description="Set color for local/remote situation." + ) + pc_set.add_argument( + "situation", + choices=info.get_color_encoding(), + help="5 possible local/remote situations", + ) + pc_set.add_argument( + "color", choices=[c.name for c in info.Color], help="available colors" + ) + + p_info = subparsers.add_parser( + "info", + description="list, add, or remove information items of the ll sub-command.", + help="information setting", + ) p_info.set_defaults(func=f_info) - - ll_doc = f''' status symbols: + info_cmds = p_info.add_subparsers( + dest="info_cmd", help="additional help with sub-command -h" + ) + info_cmds.add_parser( + "ll", description="show used and unused information items of the ll sub-command" + ) + info_cmds.add_parser("add", description="Enable information item.").add_argument( + "info_item", choices=info.ALL_INFO_ITEMS, help="information item to add" + ) + info_cmds.add_parser("rm", description="Disable information item.").add_argument( + "info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete" + ) + + ll_doc = f""" status symbols: +: staged changes *: unstaged changes _: untracked files/folders @@ -203,86 +609,213 @@ def main(argv=None): {info.Color.green}green{info.Color.end}: local is the same as remote {info.Color.red}red{info.Color.end}: local has diverged from remote {info.Color.purple}purple{info.Color.end}: local is ahead of remote (good for push) - {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)''' - p_ll = subparsers.add_parser('ll', - help='display summary of all repos', - formatter_class=argparse.RawTextHelpFormatter, - description=ll_doc) - p_ll.add_argument('group', - nargs='?', - choices=utils.get_groups(), - help="show repos in the chosen group") + {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)""" + p_ll = subparsers.add_parser( + "ll", + help="display summary of all repos", + formatter_class=argparse.RawTextHelpFormatter, + description=ll_doc, + ) + p_ll.add_argument( + "group", + nargs="?", + choices=utils.get_groups(), + help="show repos in the chosen group", + ) + p_ll.add_argument( + "-C", + "--no-colors", + action="store_true", + help="Disable coloring on the branch names.", + ) + p_ll.add_argument("-g", action="store_true", help="Show repo summaries by group.") p_ll.set_defaults(func=f_ll) + p_context = subparsers.add_parser( + "context", + help="set context", + description="Set and remove context. A context is a group." + " When set, all operations apply only to repos in that group.", + ) + p_context.add_argument( + "choice", + nargs="?", + choices=set().union(utils.get_groups(), {"none", "auto"}), + help="Without this argument, show current context. " + "Otherwise choose a group as context, or use 'auto', " + "which sets the context/group automatically based on " + "the current working directory. " + "To remove context, use 'none'. ", + ) + p_context.set_defaults(func=f_context) + p_ls = subparsers.add_parser( - 'ls', help='display names of all repos, or path of a chosen repo') - p_ls.add_argument('repo', - nargs='?', - choices=utils.get_repos(), - help="show path of the chosen repo") + "ls", + help="show repo(s) or repo path", + description="display names of all repos, or path of a chosen repo", + ) + p_ls.add_argument( + "repo", + nargs="?", + choices=utils.get_repos(), + help="show path of the chosen repo", + ) p_ls.set_defaults(func=f_ls) p_group = subparsers.add_parser( - 'group', help='group repos or display names of all groups if no repo is provided') - p_group.add_argument('to_group', - nargs='*', - choices=utils.get_choices(), - help="repo(s) to be grouped") + "group", description="list, add, or remove repo group(s)", help="group repos" + ) p_group.set_defaults(func=f_group) - - p_ungroup = subparsers.add_parser( - 'ungroup', help='remove group information for repos', - description="Remove group information on repos") - p_ungroup.add_argument('to_ungroup', - nargs='+', - choices=utils.get_repos(), - help="repo(s) to be ungrouped") - p_ungroup.set_defaults(func=f_ungroup) + group_cmds = p_group.add_subparsers( + dest="group_cmd", help="additional help with sub-command -h" + ) + pg_ll = group_cmds.add_parser("ll", description="List all groups with repos.") + pg_ll.add_argument( + "to_show", nargs="?", choices=utils.get_groups(), help="group to show" + ) + group_cmds.add_parser("ls", description="List all group names.") + pg_add = group_cmds.add_parser("add", description="Add repo(s) to a group.") + pg_add.add_argument( + "to_group", + nargs="+", + metavar="repo", + choices=utils.get_repos(), + help="repo(s) to be grouped", + ) + pg_add.add_argument( + "-n", + "--name", + dest="gname", + type=partial(_group_name, exclude_old_names=False), + metavar="group-name", + required=True, + ) + pg_add.add_argument( + "-p", "--path", dest="gpath", type=_path_name, metavar="group-path" + ) + + pg_rmrepo = group_cmds.add_parser( + "rmrepo", description="remove repo(s) from a group." + ) + pg_rmrepo.add_argument( + "to_rm", + nargs="+", + metavar="repo", + choices=utils.get_repos(), + help="repo(s) to be removed from the group", + ) + pg_rmrepo.add_argument( + "-n", + "--name", + dest="gname", + metavar="group-name", + required=True, + help="group name", + ) + pg_rename = group_cmds.add_parser("rename", description="Change group name.") + pg_rename.add_argument( + "gname", + metavar="group-name", + choices=utils.get_groups(), + help="existing group to rename", + ) + pg_rename.add_argument( + "new_name", metavar="new-name", type=_group_name, help="new group name" + ) + group_cmds.add_parser("rm", description="Remove group(s).").add_argument( + "to_ungroup", nargs="+", choices=utils.get_groups(), help="group(s) to delete" + ) # superman mode p_super = subparsers.add_parser( - 'super', - help='superman mode: delegate any git command/alias in specified or ' - 'all repo(s).\n' + "super", + help="run any git command/alias", + description="Superman mode: delegate any git command/alias in specified repo(s), group(s), or " + "all repo(s).\n" 'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n' - '\t gita super repo1 repo2 repo3 checkout new-feature') + "\t gita super repo1 repo2 repo3 checkout new-feature", + ) p_super.add_argument( - 'man', + "man", nargs=argparse.REMAINDER, - help="execute arbitrary git command/alias for specified or all repos " - "Example: gita super myrepo1 diff --name-only --staged " - "Another: gita super checkout master ") + help="execute arbitrary git command/alias for specified repo(s), group(s), or all repos.\n" + "Example: gita super repo1 diff --name-only --staged;\n" + "gita super checkout master ", + ) + p_super.add_argument( + "-q", "--quote-mode", action="store_true", help="use quote mode" + ) p_super.set_defaults(func=f_super) + # shell mode + p_shell = subparsers.add_parser( + "shell", + help="run any shell command", + description="shell mode: delegate any shell command in specified repo(s), group(s), or " + "all repo(s).\n" + "Examples:\n \t gita shell pwd; \n" + "\t gita shell repo1 repo2 repo3 touch xx", + ) + p_shell.add_argument( + "man", + nargs=argparse.REMAINDER, + help="execute arbitrary shell command for specified repo(s), group(s), or all repos.\n" + "Example: gita shell myrepo1 ls\n" + "Another: gita shell git checkout master ", + ) + p_shell.add_argument( + "-q", "--quote-mode", action="store_true", help="use quote mode" + ) + p_shell.set_defaults(func=f_shell) + + # clear + p_clear = subparsers.add_parser( + "clear", + description="removes all groups and repositories", + help="removes all groups and repositories", + ) + p_clear.set_defaults(func=f_clear) + # sub-commands that fit boilerplate cmds = utils.get_cmds_from_files() for name, data in cmds.items(): - help = data.get('help') - cmd = data.get('cmd') or name - if data.get('allow_all'): + help = data.get("help") + cmd = data["cmd"] + if data.get("allow_all"): choices = utils.get_choices() - nargs = '*' - help += ' for all repos or' + nargs = "*" + help += " for all repos or" else: choices = utils.get_repos().keys() | utils.get_groups().keys() - nargs = '+' - help += ' for the chosen repo(s) or group(s)' - sp = subparsers.add_parser(name, help=help) - sp.add_argument('repo', nargs=nargs, choices=choices, help=help) - sp.set_defaults(func=f_git_cmd, cmd=cmd.split()) + nargs = "+" + help += " for the chosen repo(s) or group(s)" + sp = subparsers.add_parser(name, description=help) + sp.add_argument("repo", nargs=nargs, choices=choices, help=help) + is_shell = bool(data.get("shell")) + sp.add_argument( + "-s", + "--shell", + default=is_shell, + type=bool, + help="If set, run in shell mode", + ) + if is_shell: + cmd = [cmd] + else: + cmd = cmd.split() + sp.set_defaults(func=f_git_cmd, cmd=cmd) args = p.parse_args(argv) args.async_blacklist = { - name - for name, data in cmds.items() if data.get('disable_async') + name for name, data in cmds.items() if data.get("disable_async") } - if 'func' in args: + if "func" in args: args.func(args) else: p.print_help() # pragma: no cover -if __name__ == '__main__': +if __name__ == "__main__": main() # pragma: no cover diff --git a/gita/cmds.json b/gita/cmds.json new file mode 100644 index 0000000..eadda81 --- /dev/null +++ b/gita/cmds.json @@ -0,0 +1,90 @@ +{ +"br":{ + "cmd": "git branch -vv", + "help":"show local branches"}, +"clean":{ + "cmd": "git clean -dfx", + "help": "remove all untracked files/folders"}, +"diff":{ + "cmd": "git diff", + "help": "git show differences"}, +"difftool":{ + "cmd": "git difftool", + "disable_async": true, + "help": "show differences using a tool" + }, +"fetch":{ + "cmd": "git fetch", + "allow_all": true, + "help": "fetch remote update" + }, +"last":{ + "cmd": "git log -1 HEAD", + "help": "show log information of HEAD" + }, +"log": + {"cmd": "git log", + "disable_async": true, + "help": "show logs" + }, +"merge":{ + "cmd": "git merge @{u}", + "help": "merge remote updates" + }, +"mergetool":{ + "cmd": "git mergetool", + "disable_async": true, + "help": "merge updates with a tool" + }, +"patch":{ + "cmd": "git format-patch HEAD~", + "help": "make a patch" + }, +"pull":{ + "cmd": "git pull", + "allow_all": true, + "help": "pull remote updates" + }, +"push":{ + "cmd": "git push", + "allow_all": true, + "help": "push the local updates" + }, +"rebase":{ + "cmd": "git rebase", + "help": "rebase from master" + }, +"reflog":{ + "cmd": "git reflog", + "help": "show ref logs" + }, +"remote":{ + "cmd": "git remote -v", + "help": "show remote settings" + }, +"reset":{ + "cmd": "git reset", + "help": "reset repo(s)" + }, +"show":{ + "cmd": "git show", + "disable_async": true, + "help": "show detailed commit information" + }, +"stash":{ + "cmd": "git stash", + "help": "store uncommited changes" + }, +"stat":{ + "cmd": "git diff --stat", + "help": "show edit statistics" + }, +"st":{ + "cmd": "git status", + "help": "show status" + }, +"tag":{ + "cmd": "git tag -n", + "help": "show tags" + } +} diff --git a/gita/cmds.yml b/gita/cmds.yml deleted file mode 100644 index 8db932e..0000000 --- a/gita/cmds.yml +++ /dev/null @@ -1,65 +0,0 @@ -br: - cmd: branch -vv - help: show local branches -clean: - cmd: clean -dfx - help: remove all untracked files/folders -diff: - help: show differences -difftool: - disable_async: true - help: show differences using a tool -fetch: - allow_all: true - help: fetch remote update -last: - cmd: log -1 HEAD - help: show log information of HEAD -log: - disable_async: true - help: show logs -merge: - cmd: merge @{u} - help: merge remote updates -mergetool: - disable_async: true - help: merge updates with a tool -patch: - cmd: format-patch HEAD~ - help: make a patch -pull: - allow_all: true - help: pull remote updates -push: - help: push the local updates -rebase: - help: rebase from master -reflog: - help: show ref logs -remote: - cmd: remote -v - help: show remote settings -reset: - help: reset repo(s) -shortlog: - disable_async: true - help: show short log -show: - disable_async: true - help: show detailed commit information -show-branch: - disable_async: true - help: show detailed branch information -stash: - help: store uncommited changes -stat: - cmd: diff --stat - help: show edit statistics -st: - help: show status -tag: - cmd: tag -n - help: show tags -whatchanged: - disable_async: true - help: show detailed log diff --git a/gita/common.py b/gita/common.py index 0a271fc..64116af 100644 --- a/gita/common.py +++ b/gita/common.py @@ -2,7 +2,16 @@ import os def get_config_dir() -> str: - parent = os.environ.get('XDG_CONFIG_HOME') or os.path.join( - os.path.expanduser('~'), '.config') - root = os.path.join(parent, "gita") - return root + root = ( + os.environ.get("GITA_PROJECT_HOME") + or os.environ.get("XDG_CONFIG_HOME") + or os.path.join(os.path.expanduser("~"), ".config") + ) + return os.path.join(root, "gita") + + +def get_config_fname(fname: str) -> str: + """ + Return the file name that stores the repo locations. + """ + return os.path.join(get_config_dir(), fname) diff --git a/gita/info.py b/gita/info.py index 18d20fd..10d8bea 100644 --- a/gita/info.py +++ b/gita/info.py @@ -1,146 +1,275 @@ -import os -import sys -import yaml +import csv import subprocess +from enum import Enum +from pathlib import Path +from collections import namedtuple +from functools import lru_cache, partial from typing import Tuple, List, Callable, Dict + from . import common -class Color: +class Color(Enum): """ Terminal color """ - red = '\x1b[31m' # local diverges from remote - green = '\x1b[32m' # local == remote - yellow = '\x1b[33m' # local is behind - blue = '\x1b[34m' - purple = '\x1b[35m' # local is ahead - cyan = '\x1b[36m' - white = '\x1b[37m' # no remote branch - end = '\x1b[0m' + black = "\x1b[30m" + red = "\x1b[31m" # local diverges from remote + green = "\x1b[32m" # local == remote + yellow = "\x1b[33m" # local is behind + blue = "\x1b[34m" + purple = "\x1b[35m" # local is ahead + cyan = "\x1b[36m" + white = "\x1b[37m" # no remote branch + end = "\x1b[0m" + b_black = "\x1b[30;1m" + b_red = "\x1b[31;1m" + b_green = "\x1b[32;1m" + b_yellow = "\x1b[33;1m" + b_blue = "\x1b[34;1m" + b_purple = "\x1b[35;1m" + b_cyan = "\x1b[36;1m" + b_white = "\x1b[37;1m" + underline = "\x1B[4m" + + # Make f"{Color.foo}" expand to Color.foo.value . + # + # See https://stackoverflow.com/a/24487545 + def __str__(self): + return f"{self.value}" + + +default_colors = { + "no_remote": Color.white.name, + "in_sync": Color.green.name, + "diverged": Color.red.name, + "local_ahead": Color.purple.name, + "remote_ahead": Color.yellow.name, +} + + +def show_colors(): # pragma: no cover + """ """ + for i, c in enumerate(Color, start=1): + if c != Color.end and c != Color.underline: + print(f"{c.value}{c.name:<8} ", end="") + if i % 9 == 0: + print() + print(f"{Color.end}") + for situation, c in sorted(get_color_encoding().items()): + print(f"{situation:<12}: {Color[c].value}{c:<8}{Color.end} ") -def get_info_funcs() -> List[Callable[[str], str]]: + +@lru_cache() +def get_color_encoding() -> Dict[str, str]: + """ + Return color scheme for different local/remote situations. + In the format of {situation: color name} + """ + # custom settings + csv_config = Path(common.get_config_fname("color.csv")) + if csv_config.is_file(): + with open(csv_config, "r") as f: + reader = csv.DictReader(f) + colors = next(reader) + else: + colors = default_colors + return colors + + +def get_info_funcs(no_colors=False) -> List[Callable[[str], str]]: """ Return the functions to generate `gita ll` information. All these functions take the repo path as input and return the corresponding information as str. See `get_path`, `get_repo_status`, `get_common_commit` for examples. """ - info_items, to_display = get_info_items() - return [info_items[k] for k in to_display] + to_display = get_info_items() + # This re-definition is to make unit test mocking to work + all_info_items = { + "branch": partial(get_repo_status, no_colors=no_colors), + "branch_name": get_repo_branch, + "commit_msg": get_commit_msg, + "commit_time": get_commit_time, + "path": get_path, + } + return [all_info_items[k] for k in to_display] -def get_info_items() -> Tuple[Dict[str, Callable[[str], str]], List[str]]: +def get_info_items() -> List[str]: """ - Return the available information items for display in the `gita ll` - sub-command, and the ones to be displayed. - It loads custom information functions and configuration if they exist. + Return the information items to be displayed in the `gita ll` command. """ - # default settings - info_items = {'branch': get_repo_status, - 'commit_msg': get_commit_msg, - 'path': get_path, } - display_items = ['branch', 'commit_msg'] - # custom settings - root = common.get_config_dir() - src_fname = os.path.join(root, 'extra_repo_info.py') - yml_fname = os.path.join(root, 'info.yml') - if os.path.isfile(src_fname): - sys.path.append(root) - from extra_repo_info import extra_info_items - info_items.update(extra_info_items) - if os.path.isfile(yml_fname): - with open(yml_fname, 'r') as stream: - display_items = yaml.load(stream, Loader=yaml.FullLoader) - display_items = [x for x in display_items if x in info_items] - return info_items, display_items + csv_config = Path(common.get_config_fname("info.csv")) + if csv_config.is_file(): + with open(csv_config, "r") as f: + reader = csv.reader(f) + display_items = next(reader) + display_items = [x for x in display_items if x in ALL_INFO_ITEMS] + else: + # default settings + display_items = ["branch", "commit_msg", "commit_time"] + return display_items -def get_path(path): - return Color.cyan + path + Color.end +def get_path(prop: Dict[str, str]) -> str: + return f'{Color.cyan}{prop["path"]}{Color.end}' +# TODO: do we need to add the flags here too? def get_head(path: str) -> str: - result = subprocess.run('git rev-parse --abbrev-ref HEAD'.split(), - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - universal_newlines=True, - cwd=path) + result = subprocess.run( + "git symbolic-ref -q --short HEAD || git describe --tags --exact-match", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True, + cwd=path, + ) return result.stdout.strip() -def run_quiet_diff(args: List[str]) -> bool: +def run_quiet_diff(flags: List[str], args: List[str], path) -> int: """ Return the return code of git diff `args` in quiet mode """ result = subprocess.run( - ['git', 'diff', '--quiet'] + args, + ["git"] + flags + ["diff", "--quiet"] + args, stderr=subprocess.DEVNULL, + cwd=path, ) return result.returncode -def get_common_commit() -> str: +def get_common_commit(path) -> str: """ Return the hash of the common commit of the local and upstream branches. """ - result = subprocess.run('git merge-base @{0} @{u}'.split(), - stdout=subprocess.PIPE, - universal_newlines=True) + result = subprocess.run( + "git merge-base @{0} @{u}".split(), + stdout=subprocess.PIPE, + universal_newlines=True, + cwd=path, + ) return result.stdout.strip() -def has_untracked() -> bool: +def has_untracked(flags: List[str], path) -> bool: """ Return True if untracked file/folder exists """ - result = subprocess.run('git ls-files -zo --exclude-standard'.split(), - stdout=subprocess.PIPE) + cmd = ["git"] + flags + "ls-files -zo --exclude-standard".split() + result = subprocess.run(cmd, stdout=subprocess.PIPE, cwd=path) return bool(result.stdout) -def get_commit_msg(path: str) -> str: +def get_commit_msg(prop: Dict[str, str]) -> str: """ Return the last commit message. """ # `git show-branch --no-name HEAD` is faster than `git show -s --format=%s` - result = subprocess.run('git show-branch --no-name HEAD'.split(), - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - universal_newlines=True, - cwd=path) + cmd = ["git"] + prop["flags"] + "show-branch --no-name HEAD".split() + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True, + cwd=prop["path"], + ) return result.stdout.strip() -def get_repo_status(path: str) -> str: - head = get_head(path) - dirty, staged, untracked, color = _get_repo_status(path) - return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}' +def get_commit_time(prop: Dict[str, str]) -> str: + """ + Return the last commit time in parenthesis. + """ + cmd = ["git"] + prop["flags"] + "log -1 --format=%cd --date=relative".split() + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True, + cwd=prop["path"], + ) + return f"({result.stdout.strip()})" + + +default_symbols = { + "dirty": "*", + "staged": "+", + "untracked": "?", + "local_ahead": "↑", + "remote_ahead": "↓", + "diverged": "⇕", + "in_sync": "", + "no_remote": "∅", + "": "", +} -def _get_repo_status(path: str) -> Tuple[str]: +@lru_cache() +def get_symbols() -> Dict[str, str]: + """ + return status symbols with customization + """ + custom = {} + csv_config = Path(common.get_config_fname("symbols.csv")) + if csv_config.is_file(): + with open(csv_config, "r") as f: + reader = csv.DictReader(f) + custom = next(reader) + default_symbols.update(custom) + return default_symbols + + +def get_repo_status(prop: Dict[str, str], no_colors=False) -> str: + branch = get_head(prop["path"]) + dirty, staged, untracked, situ = _get_repo_status(prop) + symbols = get_symbols() + info = f"{branch:<10} [{symbols[dirty]+symbols[staged]+symbols[untracked]+symbols[situ]}]" + + if no_colors: + return f"{info:<18}" + colors = {situ: Color[name].value for situ, name in get_color_encoding().items()} + color = colors[situ] + return f"{color}{info:<18}{Color.end}" + + +def get_repo_branch(prop: Dict[str, str]) -> str: + return get_head(prop["path"]) + + +def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]: """ Return the status of one repo """ - os.chdir(path) - dirty = '*' if run_quiet_diff([]) else '' - staged = '+' if run_quiet_diff(['--cached']) else '' - untracked = '_' if has_untracked() else '' - - diff_returncode = run_quiet_diff(['@{u}', '@{0}']) - has_no_remote = diff_returncode == 128 - has_no_diff = diff_returncode == 0 - if has_no_remote: - color = Color.white - elif has_no_diff: - color = Color.green + path = prop["path"] + flags = prop["flags"] + dirty = "dirty" if run_quiet_diff(flags, [], path) else "" + staged = "staged" if run_quiet_diff(flags, ["--cached"], path) else "" + untracked = "untracked" if has_untracked(flags, path) else "" + + diff_returncode = run_quiet_diff(flags, ["@{u}", "@{0}"], path) + if diff_returncode == 128: + situ = "no_remote" + elif diff_returncode == 0: + situ = "in_sync" else: - common_commit = get_common_commit() - outdated = run_quiet_diff(['@{u}', common_commit]) + common_commit = get_common_commit(path) + outdated = run_quiet_diff(flags, ["@{u}", common_commit], path) if outdated: - diverged = run_quiet_diff(['@{0}', common_commit]) - color = Color.red if diverged else Color.yellow + diverged = run_quiet_diff(flags, ["@{0}", common_commit], path) + situ = "diverged" if diverged else "remote_ahead" else: # local is ahead of remote - color = Color.purple - return dirty, staged, untracked, color + situ = "local_ahead" + return dirty, staged, untracked, situ + + +ALL_INFO_ITEMS = { + "branch", + "branch_name", + "commit_msg", + "commit_time", + "path", +} diff --git a/gita/utils.py b/gita/utils.py index d14484a..6746d7f 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -1,61 +1,160 @@ +import sys import os -import yaml +import json +import csv import asyncio import platform +import subprocess from functools import lru_cache -from typing import List, Dict, Coroutine, Union +from pathlib import Path +from typing import List, Dict, Coroutine, Union, Iterator, Tuple +from collections import Counter, defaultdict +from concurrent.futures import ThreadPoolExecutor +import multiprocessing from . import info from . import common -def get_config_fname(fname: str) -> str: +MAX_INT = sys.maxsize + + +def get_relative_path(kid: os.PathLike, parent: str) -> Union[List[str], None]: """ - Return the file name that stores the repo locations. + Return the relative path depth if relative, otherwise None. + + Both the `kid` and `parent` should be absolute paths """ - root = common.get_config_dir() - return os.path.join(root, fname) + if parent == "": + return None + + p_kid = Path(kid) + # p_kid = Path(kid).resolve() + try: + p_rel = p_kid.relative_to(parent) + except ValueError: + return None + rel = str(p_rel).split(os.sep) + if rel == ["."]: + rel = [] + return rel @lru_cache() -def get_repos() -> Dict[str, str]: +def get_repos() -> Dict[str, Dict[str, str]]: """ - Return a `dict` of repo name to repo absolute path + Return a `dict` of repo name to repo absolute path and repo type + """ - path_file = get_config_fname('repo_path') + path_file = common.get_config_fname("repos.csv") repos = {} - # Each line is a repo path and repo name separated by , if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: with open(path_file) as f: - for line in f: - line = line.rstrip() - if not line: # blank line - continue - path, name = line.split(',') - if not is_git(path): - continue - if name not in repos: - repos[name] = path - else: # repo name collision for different paths: include parent path name - par_name = os.path.basename(os.path.dirname(path)) - repos[os.path.join(par_name, name)] = path + rows = csv.DictReader( + f, ["path", "name", "type", "flags"], restval="" + ) # it's actually a reader + repos = { + r["name"]: { + "path": r["path"], + "type": r["type"], + "flags": r["flags"].split(), + } + for r in rows + if is_git(r["path"], include_bare=True) + } return repos @lru_cache() -def get_groups() -> Dict[str, List[str]]: +def get_context() -> Union[Path, None]: + """ + Return context file path, or None if not set. Note that if in auto context + mode, the return value is not auto.context but the resolved context, + which could be None. + + """ + config_dir = Path(common.get_config_dir()) + matches = list(config_dir.glob("*.context")) + if len(matches) > 1: + print("Cannot have multiple .context file") + sys.exit(1) + if not matches: + return None + ctx = matches[0] + if ctx.stem == "auto": + # The context is set to be the group with minimal distance to cwd + candidate = None + min_dist = MAX_INT + for gname, prop in get_groups().items(): + rel = get_relative_path(Path.cwd(), prop["path"]) + if rel is None: + continue + d = len(rel) + if d < min_dist: + candidate = gname + min_dist = d + if not candidate: + ctx = None + else: + ctx = ctx.with_name(f"{candidate}.context") + return ctx + + +@lru_cache() +def get_groups() -> Dict[str, Dict[str, Union[str, List]]]: """ - Return a `dict` of group name to repo names. + Return a `dict` of group name to group properties such as repo names and + group path. """ - fname = get_config_fname('groups.yml') + fname = common.get_config_fname("groups.csv") groups = {} - # Each line is a repo path and repo name separated by , + repos = get_repos() + # Each line is: group-name:repo1 repo2 repo3:group-path if os.path.isfile(fname) and os.stat(fname).st_size > 0: - with open(fname, 'r') as f: - groups = yaml.load(f, Loader=yaml.FullLoader) + with open(fname, "r") as f: + rows = csv.DictReader( + f, ["name", "repos", "path"], restval="", delimiter=":" + ) + # filter out invalid repos + groups = { + r["name"]: { + "repos": [repo for repo in r["repos"].split() if repo in repos], + "path": r["path"], + } + for r in rows + } return groups +def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool: + """ + Delete repo from groups + """ + deleted = False + for name in groups: + try: + groups[name]["repos"].remove(repo) + except ValueError as e: + pass + else: + deleted = True + return deleted + + +def replace_context(old: Union[Path, None], new: str): + """ """ + auto = Path(common.get_config_dir()) / "auto.context" + if auto.exists(): + old = auto + + if new == "none": # delete + old and old.unlink() + elif old: + # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9 + old.rename(old.with_name(f"{new}.context")) + else: + Path(auto.with_name(f"{new}.context")).write_text("") + def get_choices() -> List[Union[str, None]]: """ @@ -72,67 +171,209 @@ def get_choices() -> List[Union[str, None]]: return choices -def is_git(path: str) -> bool: +def is_submodule_repo(p: Path) -> bool: + """ """ + if p.is_file() and ".git/modules" in p.read_text(): + return True + return False + + +def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool: """ Return True if the path is a git repo. """ + if not os.path.exists(path): + return False # An alternative is to call `git rev-parse --is-inside-work-tree` # I don't see why that one is better yet. - # For a regular git repo, .git is a folder, for a worktree repo, .git is a file. - # However, git submodule repo also has .git as a file. + # For a regular git repo, .git is a folder. For a worktree repo and + # submodule repo, .git is a file. # A more reliable way to differentiable regular and worktree repos is to # compare the result of `git rev-parse --git-dir` and # `git rev-parse --git-common-dir` - loc = os.path.join(path, '.git') + loc = os.path.join(path, ".git") # TODO: we can display the worktree repos in a different font. - return os.path.exists(loc) - - -def rename_repo(repos: Dict[str, str], repo: str, new_name: str): + if os.path.exists(loc): + if exclude_submodule and is_submodule_repo(Path(loc)): + return False + return True + if not include_bare: + return False + # detect bare repo + got = subprocess.run( + "git rev-parse --is-bare-repository".split(), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + cwd=path, + ) + if got.returncode == 0 and got.stdout == b"true\n": + return True + return False + + +def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str): """ Write new repo name to file """ - path = repos[repo] + if new_name in repos: + print(f"{new_name} is already in use!") + return + prop = repos[repo] del repos[repo] - repos[new_name] = path - write_to_repo_file(repos, 'w') + repos[new_name] = prop + write_to_repo_file(repos, "w") + groups = get_groups() + for g, values in groups.items(): + members = values["repos"] + if repo in members: + members.remove(repo) + members.append(new_name) + groups[g]["repos"] = sorted(members) + write_to_groups_file(groups, "w") -def write_to_repo_file(repos: Dict[str, str], mode: str): + +def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str): """ + @param repos: each repo is {name: {properties}} """ - data = ''.join(f'{path},{name}\n' for name, path in repos.items()) - fname = get_config_fname('repo_path') + # The 3rd column is repo type; unused field + data = [ + (prop["path"], name, "", " ".join(prop["flags"])) + for name, prop in repos.items() + ] + fname = common.get_config_fname("repos.csv") os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - f.write(data) + with open(fname, mode, newline="") as f: + writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerows(data) -def write_to_groups_file(groups: Dict[str, List[str]], mode: str): +# TODO: combine with the repo writer +def write_to_groups_file(groups: Dict[str, Dict], mode: str): + """ """ + fname = common.get_config_fname("groups.csv") + os.makedirs(os.path.dirname(fname), exist_ok=True) + if not groups: # all groups are deleted + Path(fname).write_text("") + else: + # delete the group if there are no repos + for name in list(groups): + if not groups[name]["repos"]: + del groups[name] + with open(fname, mode, newline="") as f: + data = [ + (group, " ".join(prop["repos"]), prop["path"]) + for group, prop in groups.items() + ] + writer = csv.writer( + f, delimiter=":", quotechar='"', quoting=csv.QUOTE_MINIMAL + ) + writer.writerows(data) + + +def _make_name( + path: str, repos: Dict[str, Dict[str, str]], name_counts: Counter +) -> str: """ + Given a new repo `path`, create a repo name. By default, basename is used. + If name collision exists, further include parent path name. + @param path: It should not be in `repos` and is absolute """ - fname = get_config_fname('groups.yml') - os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode) as f: - yaml.dump(groups, f, default_flow_style=None) - - -def add_repos(repos: Dict[str, str], new_paths: List[str]): + name = os.path.basename(os.path.normpath(path)) + if name in repos or name_counts[name] > 1: + # path has no trailing / + par_name = os.path.basename(os.path.dirname(path)) + return os.path.join(par_name, name) + return name + + +def add_repos( + repos: Dict[str, Dict[str, str]], + new_paths: List[str], + include_bare=False, + exclude_submodule=False, + dry_run=False, +) -> Dict[str, Dict[str, str]]: """ - Write new repo paths to file + Write new repo paths to file; return the added repos. + + @param repos: name -> path """ - existing_paths = set(repos.values()) - new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p)) + existing_paths = {prop["path"] for prop in repos.values()} + new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)} new_paths = new_paths - existing_paths + new_repos = {} if new_paths: print(f"Found {len(new_paths)} new repo(s).") + if dry_run: + for p in new_paths: + print(p) + return {} + name_counts = Counter(os.path.basename(os.path.normpath(p)) for p in new_paths) new_repos = { - os.path.basename(os.path.normpath(path)): path - for path in new_paths} - write_to_repo_file(new_repos, 'a+') + _make_name(path, repos, name_counts): { + "path": path, + "flags": "", + } + for path in new_paths + } + write_to_repo_file(new_repos, "a+") + else: + print("No new repos found!") + return new_repos + + +def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[Tuple[str, ...], str]: + """ + Return relative parent strings, and the parent head string + + For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/ + then return (b, c, d) + """ + for p in paths: + rel = get_relative_path(repo_path, p)[:-1] + if rel is not None: + break else: - print('No new repos found!') + return (), "" + head, tail = os.path.split(p) + return (tail, *rel), head + + +def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, Dict]: + """ + + @params repos: repos to be grouped + """ + # FIXME: the upstream code should make sure that paths are all independent + # i.e., each repo should be contained in one and only one path + new_groups = defaultdict(dict) + for repo_name, prop in repos.items(): + hash, head = _generate_dir_hash(prop["path"], paths) + if not hash: + continue + for i in range(1, len(hash) + 1): + group_name = "-".join(hash[:i]) + prop = new_groups[group_name] + prop["path"] = os.path.join(head, *hash[:i]) + if "repos" not in prop: + prop["repos"] = [repo_name] + else: + prop["repos"].append(repo_name) + # FIXME: need to make sure the new group names don't clash with old ones + # or repo names + return new_groups + + +def parse_clone_config(fname: str) -> Iterator[List[str]]: + """ + Return the url, name, and path of all repos in `fname`. + """ + with open(fname) as f: + for line in f: + yield line.strip().split(",") async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]: @@ -140,17 +381,19 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s Run `cmds` asynchronously in `path` directory. Return the `path` if execution fails. """ + # TODO: deprecated since 3.8, will be removed in 3.10 process = await asyncio.create_subprocess_exec( *cmds, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, start_new_session=True, - cwd=path) + cwd=path, + ) stdout, stderr = await process.communicate() for pipe in (stdout, stderr): if pipe: - print(format_output(pipe.decode(), f'{repo_name}: ')) + print(format_output(pipe.decode(), repo_name)) # The existence of stderr is not good indicator since git sometimes write # to stderr even if the execution is successful, e.g. git fetch if process.returncode != 0: @@ -161,7 +404,7 @@ def format_output(s: str, prefix: str): """ Prepends every line in given string with the given prefix. """ - return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)]) + return "".join([f"{prefix}: {line}" for line in s.splitlines(keepends=True)]) def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: @@ -169,7 +412,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: Execute tasks asynchronously """ # TODO: asyncio API is nicer in python 3.7 - if platform.system() == 'Windows': + if platform.system() == "Windows": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: @@ -182,17 +425,21 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: return errors -def describe(repos: Dict[str, str]) -> str: +def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str: """ Return the status of all repos """ if repos: - name_width = max(len(n) for n in repos) + 1 - funcs = info.get_info_funcs() - for name in sorted(repos): - path = repos[name] - display_items = ' '.join(f(path) for f in funcs) - yield f'{name:<{name_width}}{display_items}' + name_width = len(max(repos, key=len)) + 1 + funcs = info.get_info_funcs(no_colors=no_colors) + + num_threads = min(multiprocessing.cpu_count(), len(repos)) + with ThreadPoolExecutor(max_workers=num_threads) as executor: + for line in executor.map( + lambda name: f'{name:<{name_width}}{" ".join(f(repos[name]) for f in funcs)}', + sorted(repos), + ): + yield line def get_cmds_from_files() -> Dict[str, Dict[str, str]]: @@ -208,18 +455,59 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: } """ # default config file - fname = os.path.join(os.path.dirname(__file__), "cmds.yml") - with open(fname, 'r') as stream: - cmds = yaml.load(stream, Loader=yaml.FullLoader) + fname = os.path.join(os.path.dirname(__file__), "cmds.json") + with open(fname, "r") as f: + cmds = json.load(f) # custom config file root = common.get_config_dir() - fname = os.path.join(root, 'cmds.yml') + fname = os.path.join(root, "cmds.json") custom_cmds = {} if os.path.isfile(fname) and os.path.getsize(fname): - with open(fname, 'r') as stream: - custom_cmds = yaml.load(stream, Loader=yaml.FullLoader) + with open(fname, "r") as f: + custom_cmds = json.load(f) # custom commands shadow default ones cmds.update(custom_cmds) return cmds + + +def parse_repos_and_rest( + input: List[str], + quote_mode=False, +) -> Tuple[Dict[str, Dict[str, str]], List[str]]: + """ + Parse gita input arguments + + @return: repos and the rest (e.g., gita shell and super commands) + """ + i = 0 + names = [] + repos = get_repos() + groups = get_groups() + ctx = get_context() + for i, word in enumerate(input): + if word in repos or word in groups: + names.append(word) + else: + break + else: # all input is repos and groups, shift the index once more + if i is not None: + i += 1 + if not names and ctx: + names = [ctx.stem] + if quote_mode and i + 1 != len(input): + print(input[i], "is not a repo or group") + sys.exit(2) + + if names: + chosen = {} + for k in names: + if k in repos: + chosen[k] = repos[k] + if k in groups: + for r in groups[k]["repos"]: + chosen[r] = repos[r] + # if not set here, all repos are chosen + repos = chosen + return repos, input[i:] |