diff options
-rw-r--r-- | README.md | 7 | ||||
-rw-r--r-- | doc/README_CN.md | 7 | ||||
-rw-r--r-- | doc/video-outline.png | bin | 168883 -> 178810 bytes | |||
-rw-r--r-- | gita/__main__.py | 758 | ||||
-rw-r--r-- | gita/cmds.json | 1 | ||||
-rw-r--r-- | gita/utils.py | 13 | ||||
-rw-r--r-- | setup.py | 28 | ||||
-rw-r--r-- | tests/test_main.py | 624 |
8 files changed, 841 insertions, 597 deletions
@@ -70,8 +70,11 @@ The bookkeeping sub-commands are and automatically generate hierarchical groups. See the [customization section](#custom) for more details. - `gita add -b <bare-repo-path(s)>`: add bare repo(s) to `gita`. See the [customization section](#custom) for more details on setting custom worktree. - `gita add -r <repo-parent-path(s)>`: add repo(s) in <repo-parent-path(s)> recursively -- `gita clone <config-file>`: clone repos in `config-file` (generated by `gita freeze`) to current directory. -- `gita clone -p <config-file>`: clone repos in `config-file` to prescribed paths. +- `gita clear`: remove all groups and repos +- `gita clone <URL>`: clone repo from `URL` at current working directory +- `gita clone <URL> -C <directory>`: change to `directory` and then clone repo +- `gita clone -f <config-file>`: clone repos in `config-file` (generated by `gita freeze`) to current directory. +- `gita clone -p -f <config-file>`: clone repos in `config-file` to prescribed paths. - `gita context`: context sub-command - `gita context`: show current context - `gita context <group-name>`: set context to `group-name`, all operations then only apply to repos in this group diff --git a/doc/README_CN.md b/doc/README_CN.md index 23a9102..fbeee09 100644 --- a/doc/README_CN.md +++ b/doc/README_CN.md @@ -57,8 +57,11 @@ - `gita add -b <bare-repo-path(s)>`: 添加bare库 [customization section](#custom) - `gita add -r <repo-parent-path(s)>`: 递归添加路径下的所有库 -- `gita clone <config-file>`: 克隆`<config-file>` (由`gita freeze`生成)里的库 -- `gita clone -p <config-file>`: 克隆`<config-file>`里的库并放到指定路径 +- `gita clear`: +- `gita clone <URL>`: +- `gita clone <URL> -C <directory>`: +- `gita clone -f <config-file>`: 克隆`<config-file>` (由`gita freeze`生成)里的库 +- `gita clone -p -f <config-file>`: 克隆`<config-file>`里的库并放到指定路径 - `gita context`: 情境命令 - `gita context`: 显示当前的情境 - `gita context none`: 去除情境 diff --git a/doc/video-outline.png b/doc/video-outline.png Binary files differindex 256be4a..a54ea23 100644 --- a/doc/video-outline.png +++ b/doc/video-outline.png diff --git a/gita/__main__.py b/gita/__main__.py index b4057ec..2517958 100644 --- a/gita/__main__.py +++ b/gita/__main__.py @@ -1,4 +1,4 @@ -''' +""" Gita manages multiple git repos. It has two functionalities 1. display the status of multiple repos side by side @@ -12,7 +12,7 @@ Examples: For bash auto completion, download and source https://github.com/nosarthur/gita/blob/master/.gita-completion.bash -''' +""" import os import sys @@ -40,7 +40,7 @@ def _group_name(name: str, exclude_old_names=True) -> str: if name in utils.get_groups(): print(f"Cannot use group name {name} since it's already in use.") sys.exit(1) - if name in {'none', 'auto'}: + if name in {"none", "auto"}: print(f"Cannot use group name {name} since it's a reserved keyword.") sys.exit(1) return name @@ -52,31 +52,42 @@ def _path_name(name: str) -> str: """ if name: return os.path.abspath(name).rstrip(os.path.sep) - return '' + return "" def f_add(args: argparse.Namespace): repos = utils.get_repos() paths = args.paths + dry_run = args.dry_run groups = utils.get_groups() if args.recursive or args.auto_group: - paths = (p.rstrip(os.path.sep) for p in chain.from_iterable( - glob.glob(os.path.join(p, '**/'), recursive=True) - for p in args.paths)) - new_repos = utils.add_repos(repos, paths, include_bare=args.bare, - exclude_submodule=args.skip_submodule) + paths = ( + p.rstrip(os.path.sep) + for p in chain.from_iterable( + glob.glob(os.path.join(p, "**/"), recursive=True) for p in args.paths + ) + ) + new_repos = utils.add_repos( + repos, + paths, + include_bare=args.bare, + exclude_submodule=args.skip_submodule, + dry_run=dry_run, + ) + if dry_run: + return if new_repos and args.auto_group: new_groups = utils.auto_group(new_repos, args.paths) if new_groups: - print(f'Created {len(new_groups)} new group(s).') - utils.write_to_groups_file(new_groups, 'a+') + print(f"Created {len(new_groups)} new group(s).") + utils.write_to_groups_file(new_groups, "a+") if new_repos and args.group: gname = args.group - gname_repos = set(groups[gname]['repos']) + gname_repos = set(groups[gname]["repos"]) gname_repos.update(new_repos) - groups[gname]['repos'] = sorted(gname_repos) - print(f'Added {len(new_repos)} repos to the {gname} group') - utils.write_to_groups_file(groups, 'w') + groups[gname]["repos"] = sorted(gname_repos) + print(f"Added {len(new_repos)} repos to the {gname} group") + utils.write_to_groups_file(groups, "w") def f_rename(args: argparse.Namespace): @@ -85,87 +96,101 @@ def f_rename(args: argparse.Namespace): def f_flags(args: argparse.Namespace): - cmd = args.flags_cmd or 'll' + cmd = args.flags_cmd or "ll" repos = utils.get_repos() - if cmd == 'll': + if cmd == "ll": for r, prop in repos.items(): - if prop['flags']: + if prop["flags"]: print(f"{r}: {prop['flags']}") - elif cmd == 'set': + elif cmd == "set": # when in memory, flags are List[str], when on disk, they are space # delimited str - repos[args.repo]['flags'] = args.flags - utils.write_to_repo_file(repos, 'w') + repos[args.repo]["flags"] = args.flags + utils.write_to_repo_file(repos, "w") def f_color(args: argparse.Namespace): - cmd = args.color_cmd or 'll' - if cmd == 'll': # pragma: no cover + cmd = args.color_cmd or "ll" + if cmd == "ll": # pragma: no cover info.show_colors() - elif cmd == 'set': + elif cmd == "set": colors = info.get_color_encoding() colors[args.situation] = args.color - csv_config = common.get_config_fname('color.csv') - with open(csv_config, 'w', newline='') as f: + csv_config = common.get_config_fname("color.csv") + with open(csv_config, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=colors) writer.writeheader() writer.writerow(colors) - elif cmd == 'reset': - Path(common.get_config_fname('color.csv')).unlink(missing_ok=True) + elif cmd == "reset": + Path(common.get_config_fname("color.csv")).unlink(missing_ok=True) def f_info(args: argparse.Namespace): to_display = info.get_info_items() - cmd = args.info_cmd or 'll' - if cmd == 'll': - print('In use:', ','.join(to_display)) + cmd = args.info_cmd or "ll" + if cmd == "ll": + print("In use:", ",".join(to_display)) unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display))) if unused: - print('Unused:', ','.join(unused)) + print("Unused:", ",".join(unused)) return - if cmd == 'add' and args.info_item not in to_display: + if cmd == "add" and args.info_item not in to_display: to_display.append(args.info_item) - csv_config = common.get_config_fname('info.csv') - with open(csv_config, 'w', newline='') as f: + csv_config = common.get_config_fname("info.csv") + with open(csv_config, "w", newline="") as f: writer = csv.writer(f) writer.writerow(to_display) - elif cmd == 'rm' and args.info_item in to_display: + elif cmd == "rm" and args.info_item in to_display: to_display.remove(args.info_item) - csv_config = common.get_config_fname('info.csv') - with open(csv_config, 'w', newline='') as f: + csv_config = common.get_config_fname("info.csv") + with open(csv_config, "w", newline="") as f: writer = csv.writer(f) writer.writerow(to_display) def f_clone(args: argparse.Namespace): - path = Path.cwd() + if args.directory: + path = args.directory + else: + path = Path.cwd() + if not args.from_file: + subprocess.run(["git", "clone", args.clonee], cwd=path) + return + if args.preserve_path: utils.exec_async_tasks( - utils.run_async(repo_name, path, ['git', 'clone', url, abs_path]) - for url, repo_name, abs_path in utils.parse_clone_config(args.fname)) + utils.run_async(repo_name, path, ["git", "clone", url, abs_path]) + for url, repo_name, abs_path in utils.parse_clone_config(args.clonee) + ) else: utils.exec_async_tasks( - utils.run_async(repo_name, path, ['git', 'clone', url]) - for url, repo_name, _ in utils.parse_clone_config(args.fname)) + utils.run_async(repo_name, path, ["git", "clone", url]) + for url, repo_name, _ in utils.parse_clone_config(args.clonee) + ) def f_freeze(_): + # TODO: filter context repos = utils.get_repos() - seen = {''} + seen = {""} for name, prop in repos.items(): - path = prop['path'] - url = '' + path = prop["path"] + url = "" # FIXME: capture_output is new in 3.7. Maybe drop support for 3.6 - cp = subprocess.run(['git', 'remote', '-v'], cwd=path, - universal_newlines=True, capture_output=True) - lines = cp.stdout.split('\n') + cp = subprocess.run( + ["git", "remote", "-v"], + cwd=path, + universal_newlines=True, + capture_output=True, + ) + lines = cp.stdout.split("\n") if cp.returncode == 0 and len(lines) > 0: parts = lines[0].split() - if len(parts)>1: + if len(parts) > 1: url = parts[1] if url not in seen: seen.add(url) - print(f'{url},{name},{path}') + print(f"{url},{name},{path}") def f_ll(args: argparse.Namespace): @@ -178,19 +203,19 @@ def f_ll(args: argparse.Namespace): args.group = ctx.stem group_repos = None if args.group: # only display repos in this group - group_repos = utils.get_groups()[args.group]['repos'] + group_repos = utils.get_groups()[args.group]["repos"] repos = {k: repos[k] for k in group_repos if k in repos} if args.g: # display by group if group_repos: - print(f'{args.group}:') + print(f"{args.group}:") for line in utils.describe(repos, no_colors=args.no_colors): - print(' ', line) + print(" ", line) else: for g, prop in utils.get_groups().items(): - print(f'{g}:') - g_repos = {k: repos[k] for k in prop['repos'] if k in repos} + print(f"{g}:") + g_repos = {k: repos[k] for k in prop["repos"] if k in repos} for line in utils.describe(g_repos, no_colors=args.no_colors): - print(' ', line) + print(" ", line) else: for line in utils.describe(repos, no_colors=args.no_colors): print(line) @@ -199,69 +224,68 @@ def f_ll(args: argparse.Namespace): def f_ls(args: argparse.Namespace): repos = utils.get_repos() if args.repo: # one repo, show its path - print(repos[args.repo]['path']) + print(repos[args.repo]["path"]) else: # show names of all repos - print(' '.join(repos)) + print(" ".join(repos)) def f_group(args: argparse.Namespace): groups = utils.get_groups() - cmd = args.group_cmd or 'll' - if cmd == 'll': - if 'to_show' in args and args.to_show: + cmd = args.group_cmd or "ll" + if cmd == "ll": + if "to_show" in args and args.to_show: gname = args.to_show - print(' '.join(groups[gname]['repos'])) + print(" ".join(groups[gname]["repos"])) else: for group, prop in groups.items(): print(f"{info.Color.underline}{group}{info.Color.end}: {prop['path']}") - for r in prop['repos']: - print(' -', r) - elif cmd == 'ls': - print(' '.join(groups)) - elif cmd == 'rename': + for r in prop["repos"]: + print(" -", r) + elif cmd == "ls": + print(" ".join(groups)) + elif cmd == "rename": new_name = args.new_name gname = args.gname groups[new_name] = groups[gname] del groups[gname] - utils.write_to_groups_file(groups, 'w') + utils.write_to_groups_file(groups, "w") # change context ctx = utils.get_context() if ctx and ctx.stem == gname: utils.replace_context(ctx, new_name) - elif cmd == 'rm': + elif cmd == "rm": ctx = utils.get_context() for name in args.to_ungroup: del groups[name] if ctx and str(ctx.stem) == name: - utils.replace_context(ctx, '') - utils.write_to_groups_file(groups, 'w') - elif cmd == 'add': + utils.replace_context(ctx, "") + utils.write_to_groups_file(groups, "w") + elif cmd == "add": gname = args.gname if gname in groups: - gname_repos = set(groups[gname]['repos']) + gname_repos = set(groups[gname]["repos"]) gname_repos.update(args.to_group) - groups[gname]['repos'] = sorted(gname_repos) - if 'gpath' in args: - groups[gname]['path'] = args.gpath - utils.write_to_groups_file(groups, 'w') + groups[gname]["repos"] = sorted(gname_repos) + if "gpath" in args: + groups[gname]["path"] = args.gpath + utils.write_to_groups_file(groups, "w") else: - gpath = '' - if 'gpath' in args: + gpath = "" + if "gpath" in args: gpath = args.gpath utils.write_to_groups_file( - {gname: {'repos': sorted(args.to_group), - 'path': gpath}}, - 'a+') - elif cmd == 'rmrepo': + {gname: {"repos": sorted(args.to_group), "path": gpath}}, "a+" + ) + elif cmd == "rmrepo": gname = args.gname if gname in groups: - group = {gname: {'repos': groups[gname]['repos'], - 'path': groups[gname]['path'] - }} + group = { + gname: {"repos": groups[gname]["repos"], "path": groups[gname]["path"]} + } for repo in args.to_rm: utils.delete_repo_from_groups(repo, group) groups[gname] = group[gname] - utils.write_to_groups_file(groups, 'w') + utils.write_to_groups_file(groups, "w") def f_context(args: argparse.Namespace): @@ -271,10 +295,10 @@ def f_context(args: argparse.Namespace): if ctx: group = ctx.stem print(f"{group}: {' '.join(utils.get_groups()[group]['repos'])}") - elif (Path(common.get_config_dir()) / 'auto.context').exists(): - print('auto: none detected!') + elif (Path(common.get_config_dir()) / "auto.context").exists(): + print("auto: none detected!") else: - print('Context is not set') + print("Context is not set") else: # set context utils.replace_context(ctx, choice) @@ -283,7 +307,7 @@ def f_rm(args: argparse.Namespace): """ Unregister repo(s) from gita """ - path_file = common.get_config_fname('repos.csv') + path_file = common.get_config_fname("repos.csv") if os.path.isfile(path_file): repos = utils.get_repos() group_updated = False @@ -293,9 +317,9 @@ def f_rm(args: argparse.Namespace): up = utils.delete_repo_from_groups(repo, groups) group_updated = group_updated or up if group_updated: - utils.write_to_groups_file(groups, 'w') + utils.write_to_groups_file(groups, "w") - utils.write_to_repo_file(repos, 'w') + utils.write_to_repo_file(repos, "w") def f_git_cmd(args: argparse.Namespace): @@ -303,7 +327,7 @@ def f_git_cmd(args: argparse.Namespace): Delegate git command/alias defined in `args.cmd`. Asynchronous execution is disabled for commands in the `args.async_blacklist`. """ - if '_parsed_repos' in args: + if "_parsed_repos" in args: repos = args._parsed_repos else: repos, _ = utils.parse_repos_and_rest(args.repo) @@ -311,15 +335,15 @@ def f_git_cmd(args: argparse.Namespace): per_repo_cmds = [] for prop in repos.values(): cmds = args.cmd.copy() - if cmds[0] == 'git' and prop['flags']: - cmds[1:1] = prop['flags'] + if cmds[0] == "git" and prop["flags"]: + cmds[1:1] = prop["flags"] per_repo_cmds.append(cmds) # This async blacklist mechanism is broken if the git command name does # not match with the gita command name. if len(repos) == 1 or args.cmd[1] in args.async_blacklist: for prop, cmds in zip(repos.values(), per_repo_cmds): - path = prop['path'] + path = prop["path"] print(path) subprocess.run(cmds, cwd=path, shell=args.shell) else: # run concurrent subprocesses @@ -327,8 +351,9 @@ def f_git_cmd(args: argparse.Namespace): # Here we shut off any user input in the async execution, and re-run # the failed ones synchronously. errors = utils.exec_async_tasks( - utils.run_async(repo_name, prop['path'], cmds) - for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items())) + utils.run_async(repo_name, prop["path"], cmds) + for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items()) + ) for path in errors: if path: print(path) @@ -342,17 +367,21 @@ def f_shell(args): Delegate shell command defined in `args.man`, which may or may not contain repo names. """ - repos, cmds = utils.parse_repos_and_rest(args.man) + repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode) if not cmds: - print('Missing commands') + print("Missing commands") sys.exit(2) - cmds = ' '.join(cmds) # join the shell command into a single string + cmds = " ".join(cmds) # join the shell command into a single string for name, prop in repos.items(): # TODO: pull this out as a function - got = subprocess.run(cmds, cwd=prop['path'], shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + got = subprocess.run( + cmds, + cwd=prop["path"], + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) print(utils.format_output(got.stdout.decode(), name)) @@ -361,135 +390,176 @@ def f_super(args): Delegate git command/alias defined in `args.man`, which may or may not contain repo names. """ - repos, cmds = utils.parse_repos_and_rest(args.man) + repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode) if not cmds: - print('Missing commands') + print("Missing commands") sys.exit(2) - args.cmd = ['git'] + cmds + args.cmd = ["git"] + cmds args._parsed_repos = repos args.shell = False f_git_cmd(args) +def f_clear(_): + utils.write_to_groups_file({}, "w") + utils.write_to_repo_file({}, "w") + + def main(argv=None): - p = argparse.ArgumentParser(prog='gita', - formatter_class=argparse.RawTextHelpFormatter, - description=__doc__) - subparsers = p.add_subparsers(title='sub-commands', - help='additional help with sub-command -h') - - version = pkg_resources.require('gita')[0].version - p.add_argument('-v', - '--version', - action='version', - version=f'%(prog)s {version}') + p = argparse.ArgumentParser( + prog="gita", formatter_class=argparse.RawTextHelpFormatter, description=__doc__ + ) + subparsers = p.add_subparsers( + title="sub-commands", help="additional help with sub-command -h" + ) + + version = pkg_resources.require("gita")[0].version + p.add_argument("-v", "--version", action="version", version=f"%(prog)s {version}") # bookkeeping sub-commands - p_add = subparsers.add_parser('add', description='add repo(s)', - help='add repo(s)') - p_add.add_argument('paths', nargs='+', type=_path_name, help="repo(s) to add") - p_add.add_argument('-g','--group', - choices=utils.get_groups(), - help="add repo(s) to the specified group") - p_add.add_argument('-s', '--skip-submodule', action='store_true', - help="skip submodule repo(s)") + p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)") + p_add.add_argument("paths", nargs="+", type=_path_name, help="repo(s) to add") + p_add.add_argument("-n", "--dry-run", action="store_true", help="dry run") + p_add.add_argument( + "-g", + "--group", + choices=utils.get_groups(), + help="add repo(s) to the specified group", + ) + p_add.add_argument( + "-s", "--skip-submodule", action="store_true", help="skip submodule repo(s)" + ) xgroup = p_add.add_mutually_exclusive_group() - xgroup.add_argument('-r', '--recursive', action='store_true', - help="recursively add repo(s) in the given path(s).") - xgroup.add_argument('-a', '--auto-group', action='store_true', - help="recursively add repo(s) in the given path(s) " - "and create hierarchical groups based on folder structure.") - xgroup.add_argument('-b', '--bare', action='store_true', - help="add bare repo(s)") + xgroup.add_argument( + "-r", + "--recursive", + action="store_true", + help="recursively add repo(s) in the given path(s).", + ) + xgroup.add_argument( + "-a", + "--auto-group", + action="store_true", + help="recursively add repo(s) in the given path(s) " + "and create hierarchical groups based on folder structure.", + ) + xgroup.add_argument("-b", "--bare", action="store_true", help="add bare repo(s)") p_add.set_defaults(func=f_add) - p_rm = subparsers.add_parser('rm', description='remove repo(s)', - help='remove repo(s)') - p_rm.add_argument('repo', - nargs='+', - choices=utils.get_repos(), - help="remove the chosen repo(s)") + p_rm = subparsers.add_parser( + "rm", description="remove repo(s)", help="remove repo(s)" + ) + p_rm.add_argument( + "repo", nargs="+", choices=utils.get_repos(), help="remove the chosen repo(s)" + ) p_rm.set_defaults(func=f_rm) - p_freeze = subparsers.add_parser('freeze', - description='print all repo information', - help='print all repo information') + p_freeze = subparsers.add_parser( + "freeze", + description="print all repo information", + help="print all repo information", + ) p_freeze.set_defaults(func=f_freeze) - p_clone = subparsers.add_parser('clone', - description='clone repos from config file', - help='clone repos from config file') - p_clone.add_argument('fname', - help='config file. Its content should be the output of `gita freeze`.') - p_clone.add_argument('-p', '--preserve-path', dest='preserve_path', action='store_true', - help="clone repo(s) in their original paths") + p_clone = subparsers.add_parser( + "clone", description="clone repos", help="clone repos" + ) + p_clone.add_argument( + "clonee", + help="A URL or a config file.", + ) + p_clone.add_argument( + "-C", + "--directory", + help="Change to DIRECTORY before doing anything.", + ) + p_clone.add_argument( + "-f", + "--from-file", + action="store_true", + help="If set, clone repos in a config file rendered from `gita freeze`", + ) + p_clone.add_argument( + "-p", + "--preserve-path", + dest="preserve_path", + action="store_true", + help="clone repo(s) in their original paths", + ) p_clone.set_defaults(func=f_clone) - p_rename = subparsers.add_parser('rename', description='rename a repo', - help='rename a repo') + p_rename = subparsers.add_parser( + "rename", description="rename a repo", help="rename a repo" + ) p_rename.add_argument( - 'repo', - nargs=1, - choices=utils.get_repos(), - help="rename the chosen repo") - p_rename.add_argument('new_name', help="new name") + "repo", nargs=1, choices=utils.get_repos(), help="rename the chosen repo" + ) + p_rename.add_argument("new_name", help="new name") p_rename.set_defaults(func=f_rename) - p_flags = subparsers.add_parser('flags', - description='Set custom git flags for repo.', - help='git flags configuration') + p_flags = subparsers.add_parser( + "flags", + description="Set custom git flags for repo.", + help="git flags configuration", + ) p_flags.set_defaults(func=f_flags) - flags_cmds = p_flags.add_subparsers(dest='flags_cmd', - help='additional help with sub-command -h') - flags_cmds.add_parser('ll', - description='display repos with custom flags') - pf_set = flags_cmds.add_parser('set', - description='Set flags for repo.') - pf_set.add_argument('repo', choices=utils.get_repos(), - help="repo name") - pf_set.add_argument('flags', - nargs=argparse.REMAINDER, - help="custom flags, use quotes") - - p_color = subparsers.add_parser('color', - description='display and modify branch coloring of the ll sub-command.', - help='color configuration') + flags_cmds = p_flags.add_subparsers( + dest="flags_cmd", help="additional help with sub-command -h" + ) + flags_cmds.add_parser("ll", description="display repos with custom flags") + pf_set = flags_cmds.add_parser("set", description="Set flags for repo.") + pf_set.add_argument("repo", choices=utils.get_repos(), help="repo name") + pf_set.add_argument( + "flags", nargs=argparse.REMAINDER, help="custom flags, use quotes" + ) + + p_color = subparsers.add_parser( + "color", + description="display and modify branch coloring of the ll sub-command.", + help="color configuration", + ) p_color.set_defaults(func=f_color) - color_cmds = p_color.add_subparsers(dest='color_cmd', - help='additional help with sub-command -h') - color_cmds.add_parser('ll', - description='display available colors and the current branch coloring in the ll sub-command') - color_cmds.add_parser('reset', - description='reset color scheme.') - pc_set = color_cmds.add_parser('set', - description='Set color for local/remote situation.') - pc_set.add_argument('situation', - choices=info.get_color_encoding(), - help="5 possible local/remote situations") - pc_set.add_argument('color', - choices=[c.name for c in info.Color], - help="available colors") - - p_info = subparsers.add_parser('info', - description='list, add, or remove information items of the ll sub-command.', - help='information setting') + color_cmds = p_color.add_subparsers( + dest="color_cmd", help="additional help with sub-command -h" + ) + color_cmds.add_parser( + "ll", + description="display available colors and the current branch coloring in the ll sub-command", + ) + color_cmds.add_parser("reset", description="reset color scheme.") + pc_set = color_cmds.add_parser( + "set", description="Set color for local/remote situation." + ) + pc_set.add_argument( + "situation", + choices=info.get_color_encoding(), + help="5 possible local/remote situations", + ) + pc_set.add_argument( + "color", choices=[c.name for c in info.Color], help="available colors" + ) + + p_info = subparsers.add_parser( + "info", + description="list, add, or remove information items of the ll sub-command.", + help="information setting", + ) p_info.set_defaults(func=f_info) - info_cmds = p_info.add_subparsers(dest='info_cmd', - help='additional help with sub-command -h') - info_cmds.add_parser('ll', - description='show used and unused information items of the ll sub-command') - info_cmds.add_parser('add', description='Enable information item.' - ).add_argument('info_item', - choices=info.ALL_INFO_ITEMS, - help="information item to add") - info_cmds.add_parser('rm', description='Disable information item.' - ).add_argument('info_item', - choices=info.ALL_INFO_ITEMS, - help="information item to delete") - - - ll_doc = f''' status symbols: + info_cmds = p_info.add_subparsers( + dest="info_cmd", help="additional help with sub-command -h" + ) + info_cmds.add_parser( + "ll", description="show used and unused information items of the ll sub-command" + ) + info_cmds.add_parser("add", description="Enable information item.").add_argument( + "info_item", choices=info.ALL_INFO_ITEMS, help="information item to add" + ) + info_cmds.add_parser("rm", description="Disable information item.").add_argument( + "info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete" + ) + + ll_doc = f""" status symbols: +: staged changes *: unstaged changes _: untracked files/folders @@ -499,146 +569,196 @@ def main(argv=None): {info.Color.green}green{info.Color.end}: local is the same as remote {info.Color.red}red{info.Color.end}: local has diverged from remote {info.Color.purple}purple{info.Color.end}: local is ahead of remote (good for push) - {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)''' - p_ll = subparsers.add_parser('ll', - help='display summary of all repos', - formatter_class=argparse.RawTextHelpFormatter, - description=ll_doc) - p_ll.add_argument('group', - nargs='?', - choices=utils.get_groups(), - help="show repos in the chosen group") - p_ll.add_argument('-C', '--no-colors', action='store_true', - help='Disable coloring on the branch names.') - p_ll.add_argument('-g', action='store_true', - help='Show repo summaries by group.') + {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)""" + p_ll = subparsers.add_parser( + "ll", + help="display summary of all repos", + formatter_class=argparse.RawTextHelpFormatter, + description=ll_doc, + ) + p_ll.add_argument( + "group", + nargs="?", + choices=utils.get_groups(), + help="show repos in the chosen group", + ) + p_ll.add_argument( + "-C", + "--no-colors", + action="store_true", + help="Disable coloring on the branch names.", + ) + p_ll.add_argument("-g", action="store_true", help="Show repo summaries by group.") p_ll.set_defaults(func=f_ll) - p_context = subparsers.add_parser('context', - help='set context', - description='Set and remove context. A context is a group.' - ' When set, all operations apply only to repos in that group.') - p_context.add_argument('choice', - nargs='?', - choices=set().union(utils.get_groups(), {'none', 'auto'}), - help="Without this argument, show current context. " - "Otherwise choose a group as context, or use 'auto', " - "which sets the context/group automatically based on " - "the current working directory. " - "To remove context, use 'none'. ") + p_context = subparsers.add_parser( + "context", + help="set context", + description="Set and remove context. A context is a group." + " When set, all operations apply only to repos in that group.", + ) + p_context.add_argument( + "choice", + nargs="?", + choices=set().union(utils.get_groups(), {"none", "auto"}), + help="Without this argument, show current context. " + "Otherwise choose a group as context, or use 'auto', " + "which sets the context/group automatically based on " + "the current working directory. " + "To remove context, use 'none'. ", + ) p_context.set_defaults(func=f_context) p_ls = subparsers.add_parser( - 'ls', help='show repo(s) or repo path', - description='display names of all repos, or path of a chosen repo') - p_ls.add_argument('repo', - nargs='?', - choices=utils.get_repos(), - help="show path of the chosen repo") + "ls", + help="show repo(s) or repo path", + description="display names of all repos, or path of a chosen repo", + ) + p_ls.add_argument( + "repo", + nargs="?", + choices=utils.get_repos(), + help="show path of the chosen repo", + ) p_ls.set_defaults(func=f_ls) p_group = subparsers.add_parser( - 'group', description='list, add, or remove repo group(s)', - help='group repos') + "group", description="list, add, or remove repo group(s)", help="group repos" + ) p_group.set_defaults(func=f_group) - group_cmds = p_group.add_subparsers(dest='group_cmd', - help='additional help with sub-command -h') - pg_ll = group_cmds.add_parser('ll', description='List all groups with repos.') - pg_ll.add_argument('to_show', - nargs='?', - choices=utils.get_groups(), - help="group to show") - group_cmds.add_parser('ls', description='List all group names.') - pg_add = group_cmds.add_parser('add', description='Add repo(s) to a group.') - pg_add.add_argument('to_group', - nargs='+', - metavar='repo', - choices=utils.get_repos(), - help="repo(s) to be grouped") - pg_add.add_argument('-n', '--name', - dest='gname', - type=partial(_group_name, exclude_old_names=False), - metavar='group-name', - required=True) - pg_add.add_argument('-p', '--path', - dest='gpath', - type=_path_name, - metavar='group-path') - - pg_rmrepo = group_cmds.add_parser('rmrepo', description='remove repo(s) from a group.') - pg_rmrepo.add_argument('to_rm', - nargs='+', - metavar='repo', - choices=utils.get_repos(), - help="repo(s) to be removed from the group") - pg_rmrepo.add_argument('-n', '--name', - dest='gname', - metavar='group-name', - required=True, - help="group name") - pg_rename = group_cmds.add_parser('rename', description='Change group name.') - pg_rename.add_argument('gname', metavar='group-name', - choices=utils.get_groups(), - help="existing group to rename") - pg_rename.add_argument('new_name', metavar='new-name', - type=_group_name, - help="new group name") - group_cmds.add_parser('rm', - description='Remove group(s).').add_argument('to_ungroup', - nargs='+', - choices=utils.get_groups(), - help="group(s) to delete") + group_cmds = p_group.add_subparsers( + dest="group_cmd", help="additional help with sub-command -h" + ) + pg_ll = group_cmds.add_parser("ll", description="List all groups with repos.") + pg_ll.add_argument( + "to_show", nargs="?", choices=utils.get_groups(), help="group to show" + ) + group_cmds.add_parser("ls", description="List all group names.") + pg_add = group_cmds.add_parser("add", description="Add repo(s) to a group.") + pg_add.add_argument( + "to_group", + nargs="+", + metavar="repo", + choices=utils.get_repos(), + help="repo(s) to be grouped", + ) + pg_add.add_argument( + "-n", + "--name", + dest="gname", + type=partial(_group_name, exclude_old_names=False), + metavar="group-name", + required=True, + ) + pg_add.add_argument( + "-p", "--path", dest="gpath", type=_path_name, metavar="group-path" + ) + + pg_rmrepo = group_cmds.add_parser( + "rmrepo", description="remove repo(s) from a group." + ) + pg_rmrepo.add_argument( + "to_rm", + nargs="+", + metavar="repo", + choices=utils.get_repos(), + help="repo(s) to be removed from the group", + ) + pg_rmrepo.add_argument( + "-n", + "--name", + dest="gname", + metavar="group-name", + required=True, + help="group name", + ) + pg_rename = group_cmds.add_parser("rename", description="Change group name.") + pg_rename.add_argument( + "gname", + metavar="group-name", + choices=utils.get_groups(), + help="existing group to rename", + ) + pg_rename.add_argument( + "new_name", metavar="new-name", type=_group_name, help="new group name" + ) + group_cmds.add_parser("rm", description="Remove group(s).").add_argument( + "to_ungroup", nargs="+", choices=utils.get_groups(), help="group(s) to delete" + ) # superman mode p_super = subparsers.add_parser( - 'super', - help='run any git command/alias', - description='Superman mode: delegate any git command/alias in specified repo(s), group(s), or ' - 'all repo(s).\n' + "super", + help="run any git command/alias", + description="Superman mode: delegate any git command/alias in specified repo(s), group(s), or " + "all repo(s).\n" 'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n' - '\t gita super repo1 repo2 repo3 checkout new-feature') + "\t gita super repo1 repo2 repo3 checkout new-feature", + ) p_super.add_argument( - 'man', + "man", nargs=argparse.REMAINDER, help="execute arbitrary git command/alias for specified repo(s), group(s), or all repos.\n" "Example: gita super repo1 diff --name-only --staged;\n" - "gita super checkout master ") + "gita super checkout master ", + ) + p_super.add_argument( + "-q", "--quote-mode", action="store_true", help="use quote mode" + ) p_super.set_defaults(func=f_super) # shell mode p_shell = subparsers.add_parser( - 'shell', - help='run any shell command', - description='shell mode: delegate any shell command in specified repo(s), group(s), or ' - 'all repo(s).\n' - 'Examples:\n \t gita shell pwd; \n' - '\t gita shell repo1 repo2 repo3 touch xx') + "shell", + help="run any shell command", + description="shell mode: delegate any shell command in specified repo(s), group(s), or " + "all repo(s).\n" + "Examples:\n \t gita shell pwd; \n" + "\t gita shell repo1 repo2 repo3 touch xx", + ) p_shell.add_argument( - 'man', + "man", nargs=argparse.REMAINDER, help="execute arbitrary shell command for specified repo(s), group(s), or all repos.\n" "Example: gita shell myrepo1 ls\n" - "Another: gita shell git checkout master ") + "Another: gita shell git checkout master ", + ) + p_shell.add_argument( + "-q", "--quote-mode", action="store_true", help="use quote mode" + ) p_shell.set_defaults(func=f_shell) + # clear + p_clear = subparsers.add_parser( + "clear", + description="removes all groups and repositories", + help="removes all groups and repositories", + ) + p_clear.set_defaults(func=f_clear) + # sub-commands that fit boilerplate cmds = utils.get_cmds_from_files() for name, data in cmds.items(): - help = data.get('help') - cmd = data['cmd'] - if data.get('allow_all'): + help = data.get("help") + cmd = data["cmd"] + if data.get("allow_all"): choices = utils.get_choices() - nargs = '*' - help += ' for all repos or' + nargs = "*" + help += " for all repos or" else: choices = utils.get_repos().keys() | utils.get_groups().keys() - nargs = '+' - help += ' for the chosen repo(s) or group(s)' + nargs = "+" + help += " for the chosen repo(s) or group(s)" sp = subparsers.add_parser(name, description=help) - sp.add_argument('repo', nargs=nargs, choices=choices, help=help) - is_shell = bool(data.get('shell')) - sp.add_argument('-s', '--shell', default=is_shell, type=bool, - help='If set, run in shell mode') + sp.add_argument("repo", nargs=nargs, choices=choices, help=help) + is_shell = bool(data.get("shell")) + sp.add_argument( + "-s", + "--shell", + default=is_shell, + type=bool, + help="If set, run in shell mode", + ) if is_shell: cmd = [cmd] else: @@ -648,14 +768,14 @@ def main(argv=None): args = p.parse_args(argv) args.async_blacklist = { - name - for name, data in cmds.items() if data.get('disable_async') + name for name, data in cmds.items() if data.get("disable_async") } - if 'func' in args: + if "func" in args: args.func(args) else: p.print_help() # pragma: no cover -if __name__ == '__main__': + +if __name__ == "__main__": main() # pragma: no cover diff --git a/gita/cmds.json b/gita/cmds.json index 947a4dd..eadda81 100644 --- a/gita/cmds.json +++ b/gita/cmds.json @@ -47,6 +47,7 @@ }, "push":{ "cmd": "git push", + "allow_all": true, "help": "push the local updates" }, "rebase":{ diff --git a/gita/utils.py b/gita/utils.py index 8e7d9c4..5332255 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -279,6 +279,7 @@ def _make_name(path: str, repos: Dict[str, Dict[str, str]], def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str], include_bare=False, exclude_submodule=False, + dry_run=False, ) -> Dict[str, Dict[str, str]]: """ Write new repo paths to file; return the added repos. @@ -291,6 +292,10 @@ def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str], new_repos = {} if new_paths: print(f"Found {len(new_paths)} new repo(s).") + if dry_run: + for p in new_paths: + print(p) + return {} name_counts = Counter( os.path.basename(os.path.normpath(p)) for p in new_paths ) @@ -453,14 +458,14 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: return cmds -def parse_repos_and_rest(input: List[str] +def parse_repos_and_rest(input: List[str], quote_mode=False, ) -> Tuple[Dict[str, Dict[str, str]], List[str]]: """ Parse gita input arguments @return: repos and the rest (e.g., gita shell and super commands) """ - i = None + i = 0 names = [] repos = get_repos() groups = get_groups() @@ -475,6 +480,10 @@ def parse_repos_and_rest(input: List[str] i += 1 if not names and ctx: names = [ctx.stem] + if quote_mode and i + 1 != len(input): + print(input[i], 'is not a repo or group' ) + sys.exit(2) + if names: chosen = {} for k in names: @@ -1,24 +1,24 @@ from setuptools import setup long_description = None -with open('README.md', encoding='utf-8') as f: +with open("README.md", encoding="utf-8") as f: long_description = f.read() setup( - name='gita', - packages=['gita'], - version='0.16.1', - license='MIT', - description='Manage multiple git repos with sanity', + name="gita", + packages=["gita"], + version="0.16.2", + license="MIT", + description="Manage multiple git repos with sanity", long_description=long_description, - long_description_content_type='text/markdown', - url='https://github.com/nosarthur/gita', - platforms=['linux', 'osx', 'win32'], - keywords=['git', 'manage multiple repositories', 'cui', 'command-line'], - author='Dong Zhou', - author_email='zhou.dong@gmail.com', - entry_points={'console_scripts': ['gita = gita.__main__:main']}, - python_requires='~=3.6', + long_description_content_type="text/markdown", + url="https://github.com/nosarthur/gita", + platforms=["linux", "osx", "win32"], + keywords=["git", "manage multiple repositories", "cui", "command-line"], + author="Dong Zhou", + author_email="zhou.dong@gmail.com", + entry_points={"console_scripts": ["gita = gita.__main__:main"]}, + python_requires="~=3.6", classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers", diff --git a/tests/test_main.py b/tests/test_main.py index 3941ea9..e36c2d8 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,3 @@ -import os import pytest from unittest.mock import patch from pathlib import Path @@ -7,312 +6,368 @@ import asyncio import shlex from gita import __main__ -from gita import utils, info, common +from gita import utils, info from conftest import ( - PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME, - async_mock, TEST_DIR, + PATH_FNAME, + PATH_FNAME_EMPTY, + PATH_FNAME_CLASH, + GROUP_FNAME, + async_mock, + TEST_DIR, ) -@patch('gita.utils.get_repos', return_value={'aa'}) +@patch("gita.utils.get_repos", return_value={"aa"}) def test_group_name(_): - got = __main__._group_name('xx') - assert got == 'xx' + got = __main__._group_name("xx") + assert got == "xx" with pytest.raises(SystemExit): - __main__._group_name('aa') + __main__._group_name("aa") class TestAdd: - - @pytest.mark.parametrize('input, expected', [ - (['add', '.'], ''), - ]) - @patch('gita.common.get_config_fname') + @pytest.mark.parametrize( + "input, expected", + [ + (["add", "."], ""), + ], + ) + @patch("gita.common.get_config_fname") def test_add(self, mock_path_fname, tmp_path, input, expected): def side_effect(input, _=None): - return tmp_path / f'{input}.txt' + return tmp_path / f"{input}.txt" + mock_path_fname.side_effect = side_effect utils.get_repos.cache_clear() __main__.main(input) utils.get_repos.cache_clear() got = utils.get_repos() assert len(got) == 1 - assert got['gita']['type'] == expected + assert got["gita"]["type"] == expected -@pytest.mark.parametrize('path_fname, expected', [ - (PATH_FNAME, ''), - (PATH_FNAME_CLASH, "repo2: ['--haha', '--pp']\n"), - ]) -@patch('gita.utils.is_git', return_value=True) -@patch('gita.utils.get_groups', return_value={}) -@patch('gita.common.get_config_fname') +@pytest.mark.parametrize( + "path_fname, expected", + [ + (PATH_FNAME, ""), + (PATH_FNAME_CLASH, "repo2: ['--haha', '--pp']\n"), + ], +) +@patch("gita.utils.is_git", return_value=True) +@patch("gita.utils.get_groups", return_value={}) +@patch("gita.common.get_config_fname") def test_flags(mock_path_fname, _, __, path_fname, expected, capfd): mock_path_fname.return_value = path_fname utils.get_repos.cache_clear() - __main__.main(['flags']) + __main__.main(["flags"]) out, err = capfd.readouterr() - assert err == '' + assert err == "" assert out == expected class TestLsLl: - @patch('gita.common.get_config_fname') + @patch("gita.common.get_config_fname") def test_ll(self, mock_path_fname, capfd, tmp_path): """ functional test """ # avoid modifying the local configuration def side_effect(input, _=None): - return tmp_path / f'{input}.txt' + return tmp_path / f"{input}.txt" + mock_path_fname.side_effect = side_effect utils.get_repos.cache_clear() - __main__.main(['add', '.']) + __main__.main(["add", "."]) out, err = capfd.readouterr() - assert err == '' - assert 'Found 1 new repo(s).\n' == out + assert err == "" + assert "Found 1 new repo(s).\n" == out # in production this is not needed utils.get_repos.cache_clear() - __main__.main(['ls']) + __main__.main(["ls"]) out, err = capfd.readouterr() - assert err == '' - assert 'gita\n' == out + assert err == "" + assert "gita\n" == out - __main__.main(['ll']) + __main__.main(["ll"]) out, err = capfd.readouterr() - assert err == '' - assert 'gita' in out + assert err == "" + assert "gita" in out assert info.Color.end in out # no color on branch name - __main__.main(['ll', '-C']) + __main__.main(["ll", "-C"]) out, err = capfd.readouterr() - assert err == '' - assert 'gita' in out + assert err == "" + assert "gita" in out assert info.Color.end not in out - __main__.main(['ls', 'gita']) + __main__.main(["ls", "gita"]) out, err = capfd.readouterr() - assert err == '' - assert out.strip() == utils.get_repos()['gita']['path'] + assert err == "" + assert out.strip() == utils.get_repos()["gita"]["path"] def test_ls(self, monkeypatch, capfd): - monkeypatch.setattr(utils, 'get_repos', - lambda: {'repo1': {'path': '/a/'}, 'repo2': {'path': '/b/'}}) - monkeypatch.setattr(utils, 'describe', lambda x: x) - __main__.main(['ls']) + monkeypatch.setattr( + utils, + "get_repos", + lambda: {"repo1": {"path": "/a/"}, "repo2": {"path": "/b/"}}, + ) + monkeypatch.setattr(utils, "describe", lambda x: x) + __main__.main(["ls"]) out, err = capfd.readouterr() - assert err == '' + assert err == "" assert out == "repo1 repo2\n" - __main__.main(['ls', 'repo1']) + __main__.main(["ls", "repo1"]) out, err = capfd.readouterr() - assert err == '' - assert out == '/a/\n' - - @pytest.mark.parametrize('path_fname, expected', [ - (PATH_FNAME, - "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \nxxx cmaster dsu\x1b[0m msg \n"), - (PATH_FNAME_EMPTY, ""), - (PATH_FNAME_CLASH, - "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \n" - ), - ]) - @patch('gita.utils.is_git', return_value=True) - @patch('gita.info.get_head', return_value="master") - @patch('gita.info._get_repo_status', return_value=("d", "s", "u", "c")) - @patch('gita.info.get_commit_msg', return_value="msg") - @patch('gita.info.get_commit_time', return_value="") - @patch('gita.common.get_config_fname') - def test_with_path_files(self, mock_path_fname, _0, _1, _2, _3, _4, path_fname, - expected, capfd): + assert err == "" + assert out == "/a/\n" + + @pytest.mark.parametrize( + "path_fname, expected", + [ + ( + PATH_FNAME, + "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \nxxx cmaster dsu\x1b[0m msg \n", + ), + (PATH_FNAME_EMPTY, ""), + ( + PATH_FNAME_CLASH, + "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \n", + ), + ], + ) + @patch("gita.utils.is_git", return_value=True) + @patch("gita.info.get_head", return_value="master") + @patch("gita.info._get_repo_status", return_value=("d", "s", "u", "c")) + @patch("gita.info.get_commit_msg", return_value="msg") + @patch("gita.info.get_commit_time", return_value="") + @patch("gita.common.get_config_fname") + def test_with_path_files( + self, mock_path_fname, _0, _1, _2, _3, _4, path_fname, expected, capfd + ): def side_effect(input, _=None): - if input == 'repos.csv': + if input == "repos.csv": return path_fname - return f'/{input}' + return f"/{input}" + mock_path_fname.side_effect = side_effect utils.get_repos.cache_clear() - __main__.main(['ll']) + __main__.main(["ll"]) out, err = capfd.readouterr() print(out) - assert err == '' + assert err == "" assert out == expected -@pytest.mark.parametrize('input, expected', [ - ({'repo1': {'path': '/a/'}, 'repo2': {'path': '/b/'}}, ''), - ]) -@patch('subprocess.run') -@patch('gita.utils.get_repos') +@pytest.mark.parametrize( + "input, expected", + [ + ({"repo1": {"path": "/a/"}, "repo2": {"path": "/b/"}}, ""), + ], +) +@patch("subprocess.run") +@patch("gita.utils.get_repos") def test_freeze(mock_repos, mock_run, input, expected, capfd): mock_repos.return_value = input - __main__.main(['freeze']) + __main__.main(["freeze"]) assert mock_run.call_count == 2 out, err = capfd.readouterr() - assert err == '' + assert err == "" assert out == expected -@patch('gita.utils.parse_clone_config', return_value=[ - ['git@github.com:user/repo.git', 'repo', '/a/repo']]) -@patch('gita.utils.run_async', new=async_mock()) -@patch('subprocess.run') -def test_clone(*_): - asyncio.set_event_loop(asyncio.new_event_loop()) +@patch("subprocess.run") +def test_clone_with_url(mock_run): args = argparse.Namespace() - args.fname = ['freeze_filename'] + args.clonee = "http://abc.com/repo1" args.preserve_path = None + args.directory = "/home/xxx" + args.from_file = False + __main__.f_clone(args) + cmds = ["git", "clone", args.clonee] + mock_run.assert_called_once_with(cmds, cwd=args.directory) + + +@patch( + "gita.utils.parse_clone_config", + return_value=[["git@github.com:user/repo.git", "repo", "/a/repo"]], +) +@patch("gita.utils.run_async", new=async_mock()) +@patch("subprocess.run") +def test_clone_with_config_file(*_): + asyncio.set_event_loop(asyncio.new_event_loop()) + args = argparse.Namespace() + args.clonee = "freeze_filename" + args.preserve_path = False + args.directory = None + args.from_file = True __main__.f_clone(args) mock_run = utils.run_async.mock assert mock_run.call_count == 1 - cmds = ['git', 'clone', 'git@github.com:user/repo.git'] - mock_run.assert_called_once_with('repo', Path.cwd(), cmds) + cmds = ["git", "clone", "git@github.com:user/repo.git"] + mock_run.assert_called_once_with("repo", Path.cwd(), cmds) -@patch('gita.utils.parse_clone_config', return_value=[ - ['git@github.com:user/repo.git', 'repo', '/a/repo']]) -@patch('gita.utils.run_async', new=async_mock()) -@patch('subprocess.run') +@patch( + "gita.utils.parse_clone_config", + return_value=[["git@github.com:user/repo.git", "repo", "/a/repo"]], +) +@patch("gita.utils.run_async", new=async_mock()) +@patch("subprocess.run") def test_clone_with_preserve_path(*_): asyncio.set_event_loop(asyncio.new_event_loop()) args = argparse.Namespace() - args.fname = ['freeze_filename'] + args.clonee = "freeze_filename" + args.directory = None + args.from_file = True args.preserve_path = True __main__.f_clone(args) mock_run = utils.run_async.mock assert mock_run.call_count == 1 - cmds = ['git', 'clone', 'git@github.com:user/repo.git', '/a/repo'] - mock_run.assert_called_once_with('repo', Path.cwd(), cmds) + cmds = ["git", "clone", "git@github.com:user/repo.git", "/a/repo"] + mock_run.assert_called_once_with("repo", Path.cwd(), cmds) -@patch('os.makedirs') -@patch('os.path.isfile', return_value=True) -@patch('gita.common.get_config_fname', return_value='some path') -@patch('gita.utils.get_repos', return_value={'repo1': {'path': '/a/', 'type': ''}, - 'repo2': {'path': '/b/', 'type': ''}}) -@patch('gita.utils.write_to_repo_file') +@patch("os.makedirs") +@patch("os.path.isfile", return_value=True) +@patch("gita.common.get_config_fname", return_value="some path") +@patch( + "gita.utils.get_repos", + return_value={ + "repo1": {"path": "/a/", "type": ""}, + "repo2": {"path": "/b/", "type": ""}, + }, +) +@patch("gita.utils.write_to_repo_file") def test_rm(mock_write, *_): args = argparse.Namespace() - args.repo = ['repo1'] + args.repo = ["repo1"] __main__.f_rm(args) - mock_write.assert_called_once_with( - {'repo2': {'path': '/b/', 'type': ''}}, 'w') + mock_write.assert_called_once_with({"repo2": {"path": "/b/", "type": ""}}, "w") def test_not_add(): # this won't write to disk because the repo is not valid - __main__.main(['add', '/home/some/repo/']) + __main__.main(["add", "/home/some/repo/"]) -@patch('gita.utils.get_repos', return_value={'repo2': {'path': '/d/efg', - 'flags': []}}) -@patch('subprocess.run') +@patch("gita.utils.get_repos", return_value={"repo2": {"path": "/d/efg", "flags": []}}) +@patch("subprocess.run") def test_fetch(mock_run, *_): asyncio.set_event_loop(asyncio.new_event_loop()) - __main__.main(['fetch']) - mock_run.assert_called_once_with(['git', 'fetch'], cwd='/d/efg', shell=False) + __main__.main(["fetch"]) + mock_run.assert_called_once_with(["git", "fetch"], cwd="/d/efg", shell=False) @patch( - 'gita.utils.get_repos', return_value={ - 'repo1': {'path': '/a/bc', 'flags': []}, - 'repo2': {'path': '/d/efg', 'flags': []} - }) -@patch('gita.utils.run_async', new=async_mock()) -@patch('subprocess.run') + "gita.utils.get_repos", + return_value={ + "repo1": {"path": "/a/bc", "flags": []}, + "repo2": {"path": "/d/efg", "flags": []}, + }, +) +@patch("gita.utils.run_async", new=async_mock()) +@patch("subprocess.run") def test_async_fetch(*_): - __main__.main(['fetch']) + __main__.main(["fetch"]) mock_run = utils.run_async.mock assert mock_run.call_count == 2 - cmds = ['git', 'fetch'] + cmds = ["git", "fetch"] # print(mock_run.call_args_list) - mock_run.assert_any_call('repo1', '/a/bc', cmds) - mock_run.assert_any_call('repo2', '/d/efg', cmds) + mock_run.assert_any_call("repo1", "/a/bc", cmds) + mock_run.assert_any_call("repo2", "/d/efg", cmds) -@pytest.mark.parametrize('input', [ - 'diff --name-only --staged', - "commit -am 'lala kaka'", -]) -@patch('gita.utils.get_repos', return_value={'repo7': {'path': 'path7', 'flags': []}}) -@patch('subprocess.run') +@pytest.mark.parametrize( + "input", + [ + "diff --name-only --staged", + "commit -am 'lala kaka'", + ], +) +@patch("gita.utils.get_repos", return_value={"repo7": {"path": "path7", "flags": []}}) +@patch("subprocess.run") def test_superman(mock_run, _, input): mock_run.reset_mock() - args = ['super', 'repo7'] + shlex.split(input) + args = ["super", "repo7"] + shlex.split(input) __main__.main(args) - expected_cmds = ['git'] + shlex.split(input) - mock_run.assert_called_once_with(expected_cmds, cwd='path7', shell=False) + expected_cmds = ["git"] + shlex.split(input) + mock_run.assert_called_once_with(expected_cmds, cwd="path7", shell=False) -@pytest.mark.parametrize('input', [ - 'diff --name-only --staged', - "commit -am 'lala kaka'", -]) -@patch('gita.utils.get_repos', return_value={'repo7': {'path': 'path7', 'flags': []}}) -@patch('subprocess.run') +@pytest.mark.parametrize( + "input", + [ + "diff --name-only --staged", + "commit -am 'lala kaka'", + ], +) +@patch("gita.utils.get_repos", return_value={"repo7": {"path": "path7", "flags": []}}) +@patch("subprocess.run") def test_shell(mock_run, _, input): mock_run.reset_mock() - args = ['shell', 'repo7', input] + args = ["shell", "repo7", input] __main__.main(args) expected_cmds = input - mock_run.assert_called_once_with(expected_cmds, cwd='path7', shell=True, stderr=-2, stdout=-1) + mock_run.assert_called_once_with( + expected_cmds, cwd="path7", shell=True, stderr=-2, stdout=-1 + ) class TestContext: - - @patch('gita.utils.get_context', return_value=None) + @patch("gita.utils.get_context", return_value=None) def test_display_no_context(self, _, capfd): - __main__.main(['context']) + __main__.main(["context"]) out, err = capfd.readouterr() - assert err == '' - assert 'Context is not set\n' == out + assert err == "" + assert "Context is not set\n" == out - @patch('gita.utils.get_context', return_value=Path('gname.context')) - @patch('gita.utils.get_groups', return_value={'gname': {'repos': ['a', 'b']}}) + @patch("gita.utils.get_context", return_value=Path("gname.context")) + @patch("gita.utils.get_groups", return_value={"gname": {"repos": ["a", "b"]}}) def test_display_context(self, _, __, capfd): - __main__.main(['context']) + __main__.main(["context"]) out, err = capfd.readouterr() - assert err == '' - assert 'gname: a b\n' == out + assert err == "" + assert "gname: a b\n" == out - @patch('gita.utils.get_context') + @patch("gita.utils.get_context") def test_reset(self, mock_ctx): - __main__.main(['context', 'none']) + __main__.main(["context", "none"]) mock_ctx.return_value.unlink.assert_called() - @patch('gita.utils.get_context', return_value=None) - @patch('gita.common.get_config_dir', return_value=TEST_DIR) - @patch('gita.utils.get_groups', return_value={'lala': ['b'], 'kaka': []}) + @patch("gita.utils.get_context", return_value=None) + @patch("gita.common.get_config_dir", return_value=TEST_DIR) + @patch("gita.utils.get_groups", return_value={"lala": ["b"], "kaka": []}) def test_set_first_time(self, *_): - ctx = TEST_DIR / 'lala.context' + ctx = TEST_DIR / "lala.context" assert not ctx.is_file() - __main__.main(['context', 'lala']) + __main__.main(["context", "lala"]) assert ctx.is_file() ctx.unlink() - @patch('gita.common.get_config_dir', return_value=TEST_DIR) - @patch('gita.utils.get_groups', return_value={'lala': ['b'], 'kaka': []}) - @patch('gita.utils.get_context') + @patch("gita.common.get_config_dir", return_value=TEST_DIR) + @patch("gita.utils.get_groups", return_value={"lala": ["b"], "kaka": []}) + @patch("gita.utils.get_context") def test_set_second_time(self, mock_ctx, *_): - __main__.main(['context', 'kaka']) + __main__.main(["context", "kaka"]) mock_ctx.return_value.rename.assert_called() class TestGroupCmd: - - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) def test_ls(self, _, capfd): args = argparse.Namespace() args.to_group = None - args.group_cmd = 'ls' + args.group_cmd = "ls" utils.get_groups.cache_clear() __main__.f_group(args) out, err = capfd.readouterr() - assert err == '' - assert 'xx yy\n' == out + assert err == "" + assert "xx yy\n" == out - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) def test_ll(self, _, capfd): args = argparse.Namespace() args.to_group = None @@ -321,175 +376,228 @@ class TestGroupCmd: utils.get_groups.cache_clear() __main__.f_group(args) out, err = capfd.readouterr() - assert err == '' - assert out == '\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n' + assert err == "" + assert ( + out + == "\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n" + ) - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) def test_ll_with_group(self, _, capfd): args = argparse.Namespace() args.to_group = None args.group_cmd = None - args.to_show = 'yy' + args.to_show = "yy" utils.get_groups.cache_clear() __main__.f_group(args) out, err = capfd.readouterr() - assert err == '' - assert 'a c d\n' == out + assert err == "" + assert "a c d\n" == out - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) - @patch('gita.utils.write_to_groups_file') + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) + @patch("gita.utils.write_to_groups_file") def test_rename(self, mock_write, _): args = argparse.Namespace() - args.gname = 'xx' - args.new_name = 'zz' - args.group_cmd = 'rename' + args.gname = "xx" + args.new_name = "zz" + args.group_cmd = "rename" utils.get_groups.cache_clear() __main__.f_group(args) - expected = {'yy': {'repos': ['a', 'c', 'd'], 'path': ''}, - 'zz': {'repos': ['a', 'b'], 'path': ''}} - mock_write.assert_called_once_with(expected, 'w') - - @patch('gita.info.get_color_encoding', return_value=info.default_colors) - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) + expected = { + "yy": {"repos": ["a", "c", "d"], "path": ""}, + "zz": {"repos": ["a", "b"], "path": ""}, + } + mock_write.assert_called_once_with(expected, "w") + + @patch("gita.info.get_color_encoding", return_value=info.default_colors) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) def test_rename_error(self, *_): utils.get_groups.cache_clear() with pytest.raises(SystemExit, match="1"): - __main__.main('group rename xx yy'.split()) - - @pytest.mark.parametrize('input, expected', [ - ('xx', {'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}), - ("xx yy", {}), - ]) - @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''}) - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) - @patch('gita.utils.write_to_groups_file') + __main__.main("group rename xx yy".split()) + + @pytest.mark.parametrize( + "input, expected", + [ + ("xx", {"yy": {"repos": ["a", "c", "d"], "path": ""}}), + ("xx yy", {}), + ], + ) + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) + @patch("gita.utils.write_to_groups_file") def test_rm(self, mock_write, _, __, input, expected): utils.get_groups.cache_clear() - args = ['group', 'rm'] + shlex.split(input) + args = ["group", "rm"] + shlex.split(input) __main__.main(args) - mock_write.assert_called_once_with(expected, 'w') + mock_write.assert_called_once_with(expected, "w") - @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''}) - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) - @patch('gita.utils.write_to_groups_file') + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) + @patch("gita.utils.write_to_groups_file") def test_add(self, mock_write, *_): args = argparse.Namespace() - args.to_group = ['a', 'c'] - args.group_cmd = 'add' - args.gname = 'zz' + args.to_group = ["a", "c"] + args.group_cmd = "add" + args.gname = "zz" utils.get_groups.cache_clear() __main__.f_group(args) mock_write.assert_called_once_with( - {'zz': {'repos': ['a', 'c'], 'path': ''}}, 'a+') + {"zz": {"repos": ["a", "c"], "path": ""}}, "a+" + ) - @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''}) - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) - @patch('gita.utils.write_to_groups_file') + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) + @patch("gita.utils.write_to_groups_file") def test_add_to_existing(self, mock_write, *_): args = argparse.Namespace() - args.to_group = ['a', 'c'] - args.group_cmd = 'add' - args.gname = 'xx' + args.to_group = ["a", "c"] + args.group_cmd = "add" + args.gname = "xx" utils.get_groups.cache_clear() __main__.f_group(args) mock_write.assert_called_once_with( - {'xx': {'repos': ['a', 'b', 'c'], 'path': ''}, - 'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}, 'w') - - @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''}) - @patch('gita.common.get_config_fname', return_value=GROUP_FNAME) - @patch('gita.utils.write_to_groups_file') + { + "xx": {"repos": ["a", "b", "c"], "path": ""}, + "yy": {"repos": ["a", "c", "d"], "path": ""}, + }, + "w", + ) + + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) + @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) + @patch("gita.utils.write_to_groups_file") def test_rm_repo(self, mock_write, *_): args = argparse.Namespace() - args.to_rm = ['a', 'c'] - args.group_cmd = 'rmrepo' - args.gname = 'xx' + args.to_rm = ["a", "c"] + args.group_cmd = "rmrepo" + args.gname = "xx" utils.get_groups.cache_clear() __main__.f_group(args) mock_write.assert_called_once_with( - {'xx': {'repos': ['b'], 'path': ''}, - 'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}, 'w') - - @patch('gita.common.get_config_fname') + { + "xx": {"repos": ["b"], "path": ""}, + "yy": {"repos": ["a", "c", "d"], "path": ""}, + }, + "w", + ) + + @patch("gita.common.get_config_fname") def test_integration(self, mock_path_fname, tmp_path, capfd): def side_effect(input, _=None): - return tmp_path / f'{input}.csv' + return tmp_path / f"{input}.csv" + mock_path_fname.side_effect = side_effect - __main__.main('add .'.split()) + __main__.main("add .".split()) utils.get_repos.cache_clear() - __main__.main('group add gita -n test'.split()) + __main__.main("group add gita -n test".split()) utils.get_groups.cache_clear() - __main__.main('ll test'.split()) + __main__.main("ll test".split()) out, err = capfd.readouterr() - assert err == '' - assert 'gita' in out + assert err == "" + assert "gita" in out -@patch('gita.utils.is_git', return_value=True) -@patch('gita.common.get_config_fname', return_value=PATH_FNAME) -@patch('gita.utils.rename_repo') +@patch("gita.utils.is_git", return_value=True) +@patch("gita.common.get_config_fname", return_value=PATH_FNAME) +@patch("gita.utils.rename_repo") def test_rename(mock_rename, _, __): utils.get_repos.cache_clear() - args = ['rename', 'repo1', 'abc'] + args = ["rename", "repo1", "abc"] __main__.main(args) mock_rename.assert_called_once_with( - {'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []}, - 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []}, - 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}}, - 'repo1', 'abc') + { + "repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []}, + "xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []}, + "repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []}, + }, + "repo1", + "abc", + ) class TestInfo: - - @patch('gita.common.get_config_fname', return_value='') + @patch("gita.common.get_config_fname", return_value="") def test_ll(self, _, capfd): args = argparse.Namespace() args.info_cmd = None __main__.f_info(args) out, err = capfd.readouterr() - assert 'In use: branch,commit_msg,commit_time\nUnused: path\n' == out - assert err == '' + assert "In use: branch,commit_msg,commit_time\nUnused: path\n" == out + assert err == "" - @patch('gita.common.get_config_fname') + @patch("gita.common.get_config_fname") def test_add(self, mock_get_fname, tmpdir): args = argparse.Namespace() - args.info_cmd = 'add' - args.info_item = 'path' + args.info_cmd = "add" + args.info_item = "path" with tmpdir.as_cwd(): - csv_config = Path.cwd() / 'info.csv' + csv_config = Path.cwd() / "info.csv" mock_get_fname.return_value = csv_config __main__.f_info(args) items = info.get_info_items() - assert items == ['branch', 'commit_msg', 'commit_time', 'path'] + assert items == ["branch", "commit_msg", "commit_time", "path"] - @patch('gita.common.get_config_fname') + @patch("gita.common.get_config_fname") def test_rm(self, mock_get_fname, tmpdir): args = argparse.Namespace() - args.info_cmd = 'rm' - args.info_item = 'commit_msg' + args.info_cmd = "rm" + args.info_item = "commit_msg" with tmpdir.as_cwd(): - csv_config = Path.cwd() / 'info.csv' + csv_config = Path.cwd() / "info.csv" mock_get_fname.return_value = csv_config __main__.f_info(args) items = info.get_info_items() - assert items == ['branch', 'commit_time'] + assert items == ["branch", "commit_time"] -@patch('gita.common.get_config_fname') +@patch("gita.common.get_config_fname") def test_set_color(mock_get_fname, tmpdir): - args = argparse.Namespace() - args.color_cmd = 'set' - args.color = 'b_white' - args.situation = 'no-remote' - with tmpdir.as_cwd(): - csv_config = Path.cwd() / 'colors.csv' - mock_get_fname.return_value = csv_config - __main__.f_color(args) + args = argparse.Namespace() + args.color_cmd = "set" + args.color = "b_white" + args.situation = "no-remote" + with tmpdir.as_cwd(): + csv_config = Path.cwd() / "colors.csv" + mock_get_fname.return_value = csv_config + __main__.f_color(args) - info.get_color_encoding.cache_clear() # avoid side effect - items = info.get_color_encoding() info.get_color_encoding.cache_clear() # avoid side effect - assert items == {'no-remote': 'b_white', 'in-sync': 'green', - 'diverged': 'red', 'local-ahead': 'purple', - 'remote-ahead': 'yellow'} + items = info.get_color_encoding() + info.get_color_encoding.cache_clear() # avoid side effect + assert items == { + "no-remote": "b_white", + "in-sync": "green", + "diverged": "red", + "local-ahead": "purple", + "remote-ahead": "yellow", + } + + +@pytest.mark.parametrize( + "input, expected", + [ + ({"repo1": {"path": "/a/"}, "repo2": {"path": "/b/"}}, ""), + ], +) +@patch("gita.utils.write_to_groups_file") +@patch("gita.utils.write_to_repo_file") +@patch("gita.utils.get_repos") +def test_clear( + mock_repos, + mock_write_to_repo_file, + mock_write_to_groups_file, + input, + expected, + capfd, +): + mock_repos.return_value = input + __main__.main(["clear"]) + assert mock_write_to_repo_file.call_count == 1 + mock_write_to_repo_file.assert_called_once_with({}, "w") + assert mock_write_to_groups_file.call_count == 1 + mock_write_to_groups_file.assert_called_once_with({}, "w") + out, err = capfd.readouterr() + assert err == "" + assert out == expected |