summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--README.md7
-rw-r--r--doc/README_CN.md7
-rw-r--r--doc/video-outline.pngbin168883 -> 178810 bytes
-rw-r--r--gita/__main__.py758
-rw-r--r--gita/cmds.json1
-rw-r--r--gita/utils.py13
-rw-r--r--setup.py28
-rw-r--r--tests/test_main.py624
8 files changed, 841 insertions, 597 deletions
diff --git a/README.md b/README.md
index 746af4b..d1cd42e 100644
--- a/README.md
+++ b/README.md
@@ -70,8 +70,11 @@ The bookkeeping sub-commands are
and automatically generate hierarchical groups. See the [customization section](#custom) for more details.
- `gita add -b <bare-repo-path(s)>`: add bare repo(s) to `gita`. See the [customization section](#custom) for more details on setting custom worktree.
- `gita add -r <repo-parent-path(s)>`: add repo(s) in <repo-parent-path(s)> recursively
-- `gita clone <config-file>`: clone repos in `config-file` (generated by `gita freeze`) to current directory.
-- `gita clone -p <config-file>`: clone repos in `config-file` to prescribed paths.
+- `gita clear`: remove all groups and repos
+- `gita clone <URL>`: clone repo from `URL` at current working directory
+- `gita clone <URL> -C <directory>`: change to `directory` and then clone repo
+- `gita clone -f <config-file>`: clone repos in `config-file` (generated by `gita freeze`) to current directory.
+- `gita clone -p -f <config-file>`: clone repos in `config-file` to prescribed paths.
- `gita context`: context sub-command
- `gita context`: show current context
- `gita context <group-name>`: set context to `group-name`, all operations then only apply to repos in this group
diff --git a/doc/README_CN.md b/doc/README_CN.md
index 23a9102..fbeee09 100644
--- a/doc/README_CN.md
+++ b/doc/README_CN.md
@@ -57,8 +57,11 @@
- `gita add -b <bare-repo-path(s)>`: 添加bare库
[customization section](#custom)
- `gita add -r <repo-parent-path(s)>`: 递归添加路径下的所有库
-- `gita clone <config-file>`: 克隆`<config-file>` (由`gita freeze`生成)里的库
-- `gita clone -p <config-file>`: 克隆`<config-file>`里的库并放到指定路径
+- `gita clear`:
+- `gita clone <URL>`:
+- `gita clone <URL> -C <directory>`:
+- `gita clone -f <config-file>`: 克隆`<config-file>` (由`gita freeze`生成)里的库
+- `gita clone -p -f <config-file>`: 克隆`<config-file>`里的库并放到指定路径
- `gita context`: 情境命令
- `gita context`: 显示当前的情境
- `gita context none`: 去除情境
diff --git a/doc/video-outline.png b/doc/video-outline.png
index 256be4a..a54ea23 100644
--- a/doc/video-outline.png
+++ b/doc/video-outline.png
Binary files differ
diff --git a/gita/__main__.py b/gita/__main__.py
index b4057ec..2517958 100644
--- a/gita/__main__.py
+++ b/gita/__main__.py
@@ -1,4 +1,4 @@
-'''
+"""
Gita manages multiple git repos. It has two functionalities
1. display the status of multiple repos side by side
@@ -12,7 +12,7 @@ Examples:
For bash auto completion, download and source
https://github.com/nosarthur/gita/blob/master/.gita-completion.bash
-'''
+"""
import os
import sys
@@ -40,7 +40,7 @@ def _group_name(name: str, exclude_old_names=True) -> str:
if name in utils.get_groups():
print(f"Cannot use group name {name} since it's already in use.")
sys.exit(1)
- if name in {'none', 'auto'}:
+ if name in {"none", "auto"}:
print(f"Cannot use group name {name} since it's a reserved keyword.")
sys.exit(1)
return name
@@ -52,31 +52,42 @@ def _path_name(name: str) -> str:
"""
if name:
return os.path.abspath(name).rstrip(os.path.sep)
- return ''
+ return ""
def f_add(args: argparse.Namespace):
repos = utils.get_repos()
paths = args.paths
+ dry_run = args.dry_run
groups = utils.get_groups()
if args.recursive or args.auto_group:
- paths = (p.rstrip(os.path.sep) for p in chain.from_iterable(
- glob.glob(os.path.join(p, '**/'), recursive=True)
- for p in args.paths))
- new_repos = utils.add_repos(repos, paths, include_bare=args.bare,
- exclude_submodule=args.skip_submodule)
+ paths = (
+ p.rstrip(os.path.sep)
+ for p in chain.from_iterable(
+ glob.glob(os.path.join(p, "**/"), recursive=True) for p in args.paths
+ )
+ )
+ new_repos = utils.add_repos(
+ repos,
+ paths,
+ include_bare=args.bare,
+ exclude_submodule=args.skip_submodule,
+ dry_run=dry_run,
+ )
+ if dry_run:
+ return
if new_repos and args.auto_group:
new_groups = utils.auto_group(new_repos, args.paths)
if new_groups:
- print(f'Created {len(new_groups)} new group(s).')
- utils.write_to_groups_file(new_groups, 'a+')
+ print(f"Created {len(new_groups)} new group(s).")
+ utils.write_to_groups_file(new_groups, "a+")
if new_repos and args.group:
gname = args.group
- gname_repos = set(groups[gname]['repos'])
+ gname_repos = set(groups[gname]["repos"])
gname_repos.update(new_repos)
- groups[gname]['repos'] = sorted(gname_repos)
- print(f'Added {len(new_repos)} repos to the {gname} group')
- utils.write_to_groups_file(groups, 'w')
+ groups[gname]["repos"] = sorted(gname_repos)
+ print(f"Added {len(new_repos)} repos to the {gname} group")
+ utils.write_to_groups_file(groups, "w")
def f_rename(args: argparse.Namespace):
@@ -85,87 +96,101 @@ def f_rename(args: argparse.Namespace):
def f_flags(args: argparse.Namespace):
- cmd = args.flags_cmd or 'll'
+ cmd = args.flags_cmd or "ll"
repos = utils.get_repos()
- if cmd == 'll':
+ if cmd == "ll":
for r, prop in repos.items():
- if prop['flags']:
+ if prop["flags"]:
print(f"{r}: {prop['flags']}")
- elif cmd == 'set':
+ elif cmd == "set":
# when in memory, flags are List[str], when on disk, they are space
# delimited str
- repos[args.repo]['flags'] = args.flags
- utils.write_to_repo_file(repos, 'w')
+ repos[args.repo]["flags"] = args.flags
+ utils.write_to_repo_file(repos, "w")
def f_color(args: argparse.Namespace):
- cmd = args.color_cmd or 'll'
- if cmd == 'll': # pragma: no cover
+ cmd = args.color_cmd or "ll"
+ if cmd == "ll": # pragma: no cover
info.show_colors()
- elif cmd == 'set':
+ elif cmd == "set":
colors = info.get_color_encoding()
colors[args.situation] = args.color
- csv_config = common.get_config_fname('color.csv')
- with open(csv_config, 'w', newline='') as f:
+ csv_config = common.get_config_fname("color.csv")
+ with open(csv_config, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=colors)
writer.writeheader()
writer.writerow(colors)
- elif cmd == 'reset':
- Path(common.get_config_fname('color.csv')).unlink(missing_ok=True)
+ elif cmd == "reset":
+ Path(common.get_config_fname("color.csv")).unlink(missing_ok=True)
def f_info(args: argparse.Namespace):
to_display = info.get_info_items()
- cmd = args.info_cmd or 'll'
- if cmd == 'll':
- print('In use:', ','.join(to_display))
+ cmd = args.info_cmd or "ll"
+ if cmd == "ll":
+ print("In use:", ",".join(to_display))
unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display)))
if unused:
- print('Unused:', ','.join(unused))
+ print("Unused:", ",".join(unused))
return
- if cmd == 'add' and args.info_item not in to_display:
+ if cmd == "add" and args.info_item not in to_display:
to_display.append(args.info_item)
- csv_config = common.get_config_fname('info.csv')
- with open(csv_config, 'w', newline='') as f:
+ csv_config = common.get_config_fname("info.csv")
+ with open(csv_config, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(to_display)
- elif cmd == 'rm' and args.info_item in to_display:
+ elif cmd == "rm" and args.info_item in to_display:
to_display.remove(args.info_item)
- csv_config = common.get_config_fname('info.csv')
- with open(csv_config, 'w', newline='') as f:
+ csv_config = common.get_config_fname("info.csv")
+ with open(csv_config, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(to_display)
def f_clone(args: argparse.Namespace):
- path = Path.cwd()
+ if args.directory:
+ path = args.directory
+ else:
+ path = Path.cwd()
+ if not args.from_file:
+ subprocess.run(["git", "clone", args.clonee], cwd=path)
+ return
+
if args.preserve_path:
utils.exec_async_tasks(
- utils.run_async(repo_name, path, ['git', 'clone', url, abs_path])
- for url, repo_name, abs_path in utils.parse_clone_config(args.fname))
+ utils.run_async(repo_name, path, ["git", "clone", url, abs_path])
+ for url, repo_name, abs_path in utils.parse_clone_config(args.clonee)
+ )
else:
utils.exec_async_tasks(
- utils.run_async(repo_name, path, ['git', 'clone', url])
- for url, repo_name, _ in utils.parse_clone_config(args.fname))
+ utils.run_async(repo_name, path, ["git", "clone", url])
+ for url, repo_name, _ in utils.parse_clone_config(args.clonee)
+ )
def f_freeze(_):
+ # TODO: filter context
repos = utils.get_repos()
- seen = {''}
+ seen = {""}
for name, prop in repos.items():
- path = prop['path']
- url = ''
+ path = prop["path"]
+ url = ""
# FIXME: capture_output is new in 3.7. Maybe drop support for 3.6
- cp = subprocess.run(['git', 'remote', '-v'], cwd=path,
- universal_newlines=True, capture_output=True)
- lines = cp.stdout.split('\n')
+ cp = subprocess.run(
+ ["git", "remote", "-v"],
+ cwd=path,
+ universal_newlines=True,
+ capture_output=True,
+ )
+ lines = cp.stdout.split("\n")
if cp.returncode == 0 and len(lines) > 0:
parts = lines[0].split()
- if len(parts)>1:
+ if len(parts) > 1:
url = parts[1]
if url not in seen:
seen.add(url)
- print(f'{url},{name},{path}')
+ print(f"{url},{name},{path}")
def f_ll(args: argparse.Namespace):
@@ -178,19 +203,19 @@ def f_ll(args: argparse.Namespace):
args.group = ctx.stem
group_repos = None
if args.group: # only display repos in this group
- group_repos = utils.get_groups()[args.group]['repos']
+ group_repos = utils.get_groups()[args.group]["repos"]
repos = {k: repos[k] for k in group_repos if k in repos}
if args.g: # display by group
if group_repos:
- print(f'{args.group}:')
+ print(f"{args.group}:")
for line in utils.describe(repos, no_colors=args.no_colors):
- print(' ', line)
+ print(" ", line)
else:
for g, prop in utils.get_groups().items():
- print(f'{g}:')
- g_repos = {k: repos[k] for k in prop['repos'] if k in repos}
+ print(f"{g}:")
+ g_repos = {k: repos[k] for k in prop["repos"] if k in repos}
for line in utils.describe(g_repos, no_colors=args.no_colors):
- print(' ', line)
+ print(" ", line)
else:
for line in utils.describe(repos, no_colors=args.no_colors):
print(line)
@@ -199,69 +224,68 @@ def f_ll(args: argparse.Namespace):
def f_ls(args: argparse.Namespace):
repos = utils.get_repos()
if args.repo: # one repo, show its path
- print(repos[args.repo]['path'])
+ print(repos[args.repo]["path"])
else: # show names of all repos
- print(' '.join(repos))
+ print(" ".join(repos))
def f_group(args: argparse.Namespace):
groups = utils.get_groups()
- cmd = args.group_cmd or 'll'
- if cmd == 'll':
- if 'to_show' in args and args.to_show:
+ cmd = args.group_cmd or "ll"
+ if cmd == "ll":
+ if "to_show" in args and args.to_show:
gname = args.to_show
- print(' '.join(groups[gname]['repos']))
+ print(" ".join(groups[gname]["repos"]))
else:
for group, prop in groups.items():
print(f"{info.Color.underline}{group}{info.Color.end}: {prop['path']}")
- for r in prop['repos']:
- print(' -', r)
- elif cmd == 'ls':
- print(' '.join(groups))
- elif cmd == 'rename':
+ for r in prop["repos"]:
+ print(" -", r)
+ elif cmd == "ls":
+ print(" ".join(groups))
+ elif cmd == "rename":
new_name = args.new_name
gname = args.gname
groups[new_name] = groups[gname]
del groups[gname]
- utils.write_to_groups_file(groups, 'w')
+ utils.write_to_groups_file(groups, "w")
# change context
ctx = utils.get_context()
if ctx and ctx.stem == gname:
utils.replace_context(ctx, new_name)
- elif cmd == 'rm':
+ elif cmd == "rm":
ctx = utils.get_context()
for name in args.to_ungroup:
del groups[name]
if ctx and str(ctx.stem) == name:
- utils.replace_context(ctx, '')
- utils.write_to_groups_file(groups, 'w')
- elif cmd == 'add':
+ utils.replace_context(ctx, "")
+ utils.write_to_groups_file(groups, "w")
+ elif cmd == "add":
gname = args.gname
if gname in groups:
- gname_repos = set(groups[gname]['repos'])
+ gname_repos = set(groups[gname]["repos"])
gname_repos.update(args.to_group)
- groups[gname]['repos'] = sorted(gname_repos)
- if 'gpath' in args:
- groups[gname]['path'] = args.gpath
- utils.write_to_groups_file(groups, 'w')
+ groups[gname]["repos"] = sorted(gname_repos)
+ if "gpath" in args:
+ groups[gname]["path"] = args.gpath
+ utils.write_to_groups_file(groups, "w")
else:
- gpath = ''
- if 'gpath' in args:
+ gpath = ""
+ if "gpath" in args:
gpath = args.gpath
utils.write_to_groups_file(
- {gname: {'repos': sorted(args.to_group),
- 'path': gpath}},
- 'a+')
- elif cmd == 'rmrepo':
+ {gname: {"repos": sorted(args.to_group), "path": gpath}}, "a+"
+ )
+ elif cmd == "rmrepo":
gname = args.gname
if gname in groups:
- group = {gname: {'repos': groups[gname]['repos'],
- 'path': groups[gname]['path']
- }}
+ group = {
+ gname: {"repos": groups[gname]["repos"], "path": groups[gname]["path"]}
+ }
for repo in args.to_rm:
utils.delete_repo_from_groups(repo, group)
groups[gname] = group[gname]
- utils.write_to_groups_file(groups, 'w')
+ utils.write_to_groups_file(groups, "w")
def f_context(args: argparse.Namespace):
@@ -271,10 +295,10 @@ def f_context(args: argparse.Namespace):
if ctx:
group = ctx.stem
print(f"{group}: {' '.join(utils.get_groups()[group]['repos'])}")
- elif (Path(common.get_config_dir()) / 'auto.context').exists():
- print('auto: none detected!')
+ elif (Path(common.get_config_dir()) / "auto.context").exists():
+ print("auto: none detected!")
else:
- print('Context is not set')
+ print("Context is not set")
else: # set context
utils.replace_context(ctx, choice)
@@ -283,7 +307,7 @@ def f_rm(args: argparse.Namespace):
"""
Unregister repo(s) from gita
"""
- path_file = common.get_config_fname('repos.csv')
+ path_file = common.get_config_fname("repos.csv")
if os.path.isfile(path_file):
repos = utils.get_repos()
group_updated = False
@@ -293,9 +317,9 @@ def f_rm(args: argparse.Namespace):
up = utils.delete_repo_from_groups(repo, groups)
group_updated = group_updated or up
if group_updated:
- utils.write_to_groups_file(groups, 'w')
+ utils.write_to_groups_file(groups, "w")
- utils.write_to_repo_file(repos, 'w')
+ utils.write_to_repo_file(repos, "w")
def f_git_cmd(args: argparse.Namespace):
@@ -303,7 +327,7 @@ def f_git_cmd(args: argparse.Namespace):
Delegate git command/alias defined in `args.cmd`. Asynchronous execution is
disabled for commands in the `args.async_blacklist`.
"""
- if '_parsed_repos' in args:
+ if "_parsed_repos" in args:
repos = args._parsed_repos
else:
repos, _ = utils.parse_repos_and_rest(args.repo)
@@ -311,15 +335,15 @@ def f_git_cmd(args: argparse.Namespace):
per_repo_cmds = []
for prop in repos.values():
cmds = args.cmd.copy()
- if cmds[0] == 'git' and prop['flags']:
- cmds[1:1] = prop['flags']
+ if cmds[0] == "git" and prop["flags"]:
+ cmds[1:1] = prop["flags"]
per_repo_cmds.append(cmds)
# This async blacklist mechanism is broken if the git command name does
# not match with the gita command name.
if len(repos) == 1 or args.cmd[1] in args.async_blacklist:
for prop, cmds in zip(repos.values(), per_repo_cmds):
- path = prop['path']
+ path = prop["path"]
print(path)
subprocess.run(cmds, cwd=path, shell=args.shell)
else: # run concurrent subprocesses
@@ -327,8 +351,9 @@ def f_git_cmd(args: argparse.Namespace):
# Here we shut off any user input in the async execution, and re-run
# the failed ones synchronously.
errors = utils.exec_async_tasks(
- utils.run_async(repo_name, prop['path'], cmds)
- for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items()))
+ utils.run_async(repo_name, prop["path"], cmds)
+ for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items())
+ )
for path in errors:
if path:
print(path)
@@ -342,17 +367,21 @@ def f_shell(args):
Delegate shell command defined in `args.man`, which may or may not
contain repo names.
"""
- repos, cmds = utils.parse_repos_and_rest(args.man)
+ repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode)
if not cmds:
- print('Missing commands')
+ print("Missing commands")
sys.exit(2)
- cmds = ' '.join(cmds) # join the shell command into a single string
+ cmds = " ".join(cmds) # join the shell command into a single string
for name, prop in repos.items():
# TODO: pull this out as a function
- got = subprocess.run(cmds, cwd=prop['path'], shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ got = subprocess.run(
+ cmds,
+ cwd=prop["path"],
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
print(utils.format_output(got.stdout.decode(), name))
@@ -361,135 +390,176 @@ def f_super(args):
Delegate git command/alias defined in `args.man`, which may or may not
contain repo names.
"""
- repos, cmds = utils.parse_repos_and_rest(args.man)
+ repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode)
if not cmds:
- print('Missing commands')
+ print("Missing commands")
sys.exit(2)
- args.cmd = ['git'] + cmds
+ args.cmd = ["git"] + cmds
args._parsed_repos = repos
args.shell = False
f_git_cmd(args)
+def f_clear(_):
+ utils.write_to_groups_file({}, "w")
+ utils.write_to_repo_file({}, "w")
+
+
def main(argv=None):
- p = argparse.ArgumentParser(prog='gita',
- formatter_class=argparse.RawTextHelpFormatter,
- description=__doc__)
- subparsers = p.add_subparsers(title='sub-commands',
- help='additional help with sub-command -h')
-
- version = pkg_resources.require('gita')[0].version
- p.add_argument('-v',
- '--version',
- action='version',
- version=f'%(prog)s {version}')
+ p = argparse.ArgumentParser(
+ prog="gita", formatter_class=argparse.RawTextHelpFormatter, description=__doc__
+ )
+ subparsers = p.add_subparsers(
+ title="sub-commands", help="additional help with sub-command -h"
+ )
+
+ version = pkg_resources.require("gita")[0].version
+ p.add_argument("-v", "--version", action="version", version=f"%(prog)s {version}")
# bookkeeping sub-commands
- p_add = subparsers.add_parser('add', description='add repo(s)',
- help='add repo(s)')
- p_add.add_argument('paths', nargs='+', type=_path_name, help="repo(s) to add")
- p_add.add_argument('-g','--group',
- choices=utils.get_groups(),
- help="add repo(s) to the specified group")
- p_add.add_argument('-s', '--skip-submodule', action='store_true',
- help="skip submodule repo(s)")
+ p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)")
+ p_add.add_argument("paths", nargs="+", type=_path_name, help="repo(s) to add")
+ p_add.add_argument("-n", "--dry-run", action="store_true", help="dry run")
+ p_add.add_argument(
+ "-g",
+ "--group",
+ choices=utils.get_groups(),
+ help="add repo(s) to the specified group",
+ )
+ p_add.add_argument(
+ "-s", "--skip-submodule", action="store_true", help="skip submodule repo(s)"
+ )
xgroup = p_add.add_mutually_exclusive_group()
- xgroup.add_argument('-r', '--recursive', action='store_true',
- help="recursively add repo(s) in the given path(s).")
- xgroup.add_argument('-a', '--auto-group', action='store_true',
- help="recursively add repo(s) in the given path(s) "
- "and create hierarchical groups based on folder structure.")
- xgroup.add_argument('-b', '--bare', action='store_true',
- help="add bare repo(s)")
+ xgroup.add_argument(
+ "-r",
+ "--recursive",
+ action="store_true",
+ help="recursively add repo(s) in the given path(s).",
+ )
+ xgroup.add_argument(
+ "-a",
+ "--auto-group",
+ action="store_true",
+ help="recursively add repo(s) in the given path(s) "
+ "and create hierarchical groups based on folder structure.",
+ )
+ xgroup.add_argument("-b", "--bare", action="store_true", help="add bare repo(s)")
p_add.set_defaults(func=f_add)
- p_rm = subparsers.add_parser('rm', description='remove repo(s)',
- help='remove repo(s)')
- p_rm.add_argument('repo',
- nargs='+',
- choices=utils.get_repos(),
- help="remove the chosen repo(s)")
+ p_rm = subparsers.add_parser(
+ "rm", description="remove repo(s)", help="remove repo(s)"
+ )
+ p_rm.add_argument(
+ "repo", nargs="+", choices=utils.get_repos(), help="remove the chosen repo(s)"
+ )
p_rm.set_defaults(func=f_rm)
- p_freeze = subparsers.add_parser('freeze',
- description='print all repo information',
- help='print all repo information')
+ p_freeze = subparsers.add_parser(
+ "freeze",
+ description="print all repo information",
+ help="print all repo information",
+ )
p_freeze.set_defaults(func=f_freeze)
- p_clone = subparsers.add_parser('clone',
- description='clone repos from config file',
- help='clone repos from config file')
- p_clone.add_argument('fname',
- help='config file. Its content should be the output of `gita freeze`.')
- p_clone.add_argument('-p', '--preserve-path', dest='preserve_path', action='store_true',
- help="clone repo(s) in their original paths")
+ p_clone = subparsers.add_parser(
+ "clone", description="clone repos", help="clone repos"
+ )
+ p_clone.add_argument(
+ "clonee",
+ help="A URL or a config file.",
+ )
+ p_clone.add_argument(
+ "-C",
+ "--directory",
+ help="Change to DIRECTORY before doing anything.",
+ )
+ p_clone.add_argument(
+ "-f",
+ "--from-file",
+ action="store_true",
+ help="If set, clone repos in a config file rendered from `gita freeze`",
+ )
+ p_clone.add_argument(
+ "-p",
+ "--preserve-path",
+ dest="preserve_path",
+ action="store_true",
+ help="clone repo(s) in their original paths",
+ )
p_clone.set_defaults(func=f_clone)
- p_rename = subparsers.add_parser('rename', description='rename a repo',
- help='rename a repo')
+ p_rename = subparsers.add_parser(
+ "rename", description="rename a repo", help="rename a repo"
+ )
p_rename.add_argument(
- 'repo',
- nargs=1,
- choices=utils.get_repos(),
- help="rename the chosen repo")
- p_rename.add_argument('new_name', help="new name")
+ "repo", nargs=1, choices=utils.get_repos(), help="rename the chosen repo"
+ )
+ p_rename.add_argument("new_name", help="new name")
p_rename.set_defaults(func=f_rename)
- p_flags = subparsers.add_parser('flags',
- description='Set custom git flags for repo.',
- help='git flags configuration')
+ p_flags = subparsers.add_parser(
+ "flags",
+ description="Set custom git flags for repo.",
+ help="git flags configuration",
+ )
p_flags.set_defaults(func=f_flags)
- flags_cmds = p_flags.add_subparsers(dest='flags_cmd',
- help='additional help with sub-command -h')
- flags_cmds.add_parser('ll',
- description='display repos with custom flags')
- pf_set = flags_cmds.add_parser('set',
- description='Set flags for repo.')
- pf_set.add_argument('repo', choices=utils.get_repos(),
- help="repo name")
- pf_set.add_argument('flags',
- nargs=argparse.REMAINDER,
- help="custom flags, use quotes")
-
- p_color = subparsers.add_parser('color',
- description='display and modify branch coloring of the ll sub-command.',
- help='color configuration')
+ flags_cmds = p_flags.add_subparsers(
+ dest="flags_cmd", help="additional help with sub-command -h"
+ )
+ flags_cmds.add_parser("ll", description="display repos with custom flags")
+ pf_set = flags_cmds.add_parser("set", description="Set flags for repo.")
+ pf_set.add_argument("repo", choices=utils.get_repos(), help="repo name")
+ pf_set.add_argument(
+ "flags", nargs=argparse.REMAINDER, help="custom flags, use quotes"
+ )
+
+ p_color = subparsers.add_parser(
+ "color",
+ description="display and modify branch coloring of the ll sub-command.",
+ help="color configuration",
+ )
p_color.set_defaults(func=f_color)
- color_cmds = p_color.add_subparsers(dest='color_cmd',
- help='additional help with sub-command -h')
- color_cmds.add_parser('ll',
- description='display available colors and the current branch coloring in the ll sub-command')
- color_cmds.add_parser('reset',
- description='reset color scheme.')
- pc_set = color_cmds.add_parser('set',
- description='Set color for local/remote situation.')
- pc_set.add_argument('situation',
- choices=info.get_color_encoding(),
- help="5 possible local/remote situations")
- pc_set.add_argument('color',
- choices=[c.name for c in info.Color],
- help="available colors")
-
- p_info = subparsers.add_parser('info',
- description='list, add, or remove information items of the ll sub-command.',
- help='information setting')
+ color_cmds = p_color.add_subparsers(
+ dest="color_cmd", help="additional help with sub-command -h"
+ )
+ color_cmds.add_parser(
+ "ll",
+ description="display available colors and the current branch coloring in the ll sub-command",
+ )
+ color_cmds.add_parser("reset", description="reset color scheme.")
+ pc_set = color_cmds.add_parser(
+ "set", description="Set color for local/remote situation."
+ )
+ pc_set.add_argument(
+ "situation",
+ choices=info.get_color_encoding(),
+ help="5 possible local/remote situations",
+ )
+ pc_set.add_argument(
+ "color", choices=[c.name for c in info.Color], help="available colors"
+ )
+
+ p_info = subparsers.add_parser(
+ "info",
+ description="list, add, or remove information items of the ll sub-command.",
+ help="information setting",
+ )
p_info.set_defaults(func=f_info)
- info_cmds = p_info.add_subparsers(dest='info_cmd',
- help='additional help with sub-command -h')
- info_cmds.add_parser('ll',
- description='show used and unused information items of the ll sub-command')
- info_cmds.add_parser('add', description='Enable information item.'
- ).add_argument('info_item',
- choices=info.ALL_INFO_ITEMS,
- help="information item to add")
- info_cmds.add_parser('rm', description='Disable information item.'
- ).add_argument('info_item',
- choices=info.ALL_INFO_ITEMS,
- help="information item to delete")
-
-
- ll_doc = f''' status symbols:
+ info_cmds = p_info.add_subparsers(
+ dest="info_cmd", help="additional help with sub-command -h"
+ )
+ info_cmds.add_parser(
+ "ll", description="show used and unused information items of the ll sub-command"
+ )
+ info_cmds.add_parser("add", description="Enable information item.").add_argument(
+ "info_item", choices=info.ALL_INFO_ITEMS, help="information item to add"
+ )
+ info_cmds.add_parser("rm", description="Disable information item.").add_argument(
+ "info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete"
+ )
+
+ ll_doc = f""" status symbols:
+: staged changes
*: unstaged changes
_: untracked files/folders
@@ -499,146 +569,196 @@ def main(argv=None):
{info.Color.green}green{info.Color.end}: local is the same as remote
{info.Color.red}red{info.Color.end}: local has diverged from remote
{info.Color.purple}purple{info.Color.end}: local is ahead of remote (good for push)
- {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)'''
- p_ll = subparsers.add_parser('ll',
- help='display summary of all repos',
- formatter_class=argparse.RawTextHelpFormatter,
- description=ll_doc)
- p_ll.add_argument('group',
- nargs='?',
- choices=utils.get_groups(),
- help="show repos in the chosen group")
- p_ll.add_argument('-C', '--no-colors', action='store_true',
- help='Disable coloring on the branch names.')
- p_ll.add_argument('-g', action='store_true',
- help='Show repo summaries by group.')
+ {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)"""
+ p_ll = subparsers.add_parser(
+ "ll",
+ help="display summary of all repos",
+ formatter_class=argparse.RawTextHelpFormatter,
+ description=ll_doc,
+ )
+ p_ll.add_argument(
+ "group",
+ nargs="?",
+ choices=utils.get_groups(),
+ help="show repos in the chosen group",
+ )
+ p_ll.add_argument(
+ "-C",
+ "--no-colors",
+ action="store_true",
+ help="Disable coloring on the branch names.",
+ )
+ p_ll.add_argument("-g", action="store_true", help="Show repo summaries by group.")
p_ll.set_defaults(func=f_ll)
- p_context = subparsers.add_parser('context',
- help='set context',
- description='Set and remove context. A context is a group.'
- ' When set, all operations apply only to repos in that group.')
- p_context.add_argument('choice',
- nargs='?',
- choices=set().union(utils.get_groups(), {'none', 'auto'}),
- help="Without this argument, show current context. "
- "Otherwise choose a group as context, or use 'auto', "
- "which sets the context/group automatically based on "
- "the current working directory. "
- "To remove context, use 'none'. ")
+ p_context = subparsers.add_parser(
+ "context",
+ help="set context",
+ description="Set and remove context. A context is a group."
+ " When set, all operations apply only to repos in that group.",
+ )
+ p_context.add_argument(
+ "choice",
+ nargs="?",
+ choices=set().union(utils.get_groups(), {"none", "auto"}),
+ help="Without this argument, show current context. "
+ "Otherwise choose a group as context, or use 'auto', "
+ "which sets the context/group automatically based on "
+ "the current working directory. "
+ "To remove context, use 'none'. ",
+ )
p_context.set_defaults(func=f_context)
p_ls = subparsers.add_parser(
- 'ls', help='show repo(s) or repo path',
- description='display names of all repos, or path of a chosen repo')
- p_ls.add_argument('repo',
- nargs='?',
- choices=utils.get_repos(),
- help="show path of the chosen repo")
+ "ls",
+ help="show repo(s) or repo path",
+ description="display names of all repos, or path of a chosen repo",
+ )
+ p_ls.add_argument(
+ "repo",
+ nargs="?",
+ choices=utils.get_repos(),
+ help="show path of the chosen repo",
+ )
p_ls.set_defaults(func=f_ls)
p_group = subparsers.add_parser(
- 'group', description='list, add, or remove repo group(s)',
- help='group repos')
+ "group", description="list, add, or remove repo group(s)", help="group repos"
+ )
p_group.set_defaults(func=f_group)
- group_cmds = p_group.add_subparsers(dest='group_cmd',
- help='additional help with sub-command -h')
- pg_ll = group_cmds.add_parser('ll', description='List all groups with repos.')
- pg_ll.add_argument('to_show',
- nargs='?',
- choices=utils.get_groups(),
- help="group to show")
- group_cmds.add_parser('ls', description='List all group names.')
- pg_add = group_cmds.add_parser('add', description='Add repo(s) to a group.')
- pg_add.add_argument('to_group',
- nargs='+',
- metavar='repo',
- choices=utils.get_repos(),
- help="repo(s) to be grouped")
- pg_add.add_argument('-n', '--name',
- dest='gname',
- type=partial(_group_name, exclude_old_names=False),
- metavar='group-name',
- required=True)
- pg_add.add_argument('-p', '--path',
- dest='gpath',
- type=_path_name,
- metavar='group-path')
-
- pg_rmrepo = group_cmds.add_parser('rmrepo', description='remove repo(s) from a group.')
- pg_rmrepo.add_argument('to_rm',
- nargs='+',
- metavar='repo',
- choices=utils.get_repos(),
- help="repo(s) to be removed from the group")
- pg_rmrepo.add_argument('-n', '--name',
- dest='gname',
- metavar='group-name',
- required=True,
- help="group name")
- pg_rename = group_cmds.add_parser('rename', description='Change group name.')
- pg_rename.add_argument('gname', metavar='group-name',
- choices=utils.get_groups(),
- help="existing group to rename")
- pg_rename.add_argument('new_name', metavar='new-name',
- type=_group_name,
- help="new group name")
- group_cmds.add_parser('rm',
- description='Remove group(s).').add_argument('to_ungroup',
- nargs='+',
- choices=utils.get_groups(),
- help="group(s) to delete")
+ group_cmds = p_group.add_subparsers(
+ dest="group_cmd", help="additional help with sub-command -h"
+ )
+ pg_ll = group_cmds.add_parser("ll", description="List all groups with repos.")
+ pg_ll.add_argument(
+ "to_show", nargs="?", choices=utils.get_groups(), help="group to show"
+ )
+ group_cmds.add_parser("ls", description="List all group names.")
+ pg_add = group_cmds.add_parser("add", description="Add repo(s) to a group.")
+ pg_add.add_argument(
+ "to_group",
+ nargs="+",
+ metavar="repo",
+ choices=utils.get_repos(),
+ help="repo(s) to be grouped",
+ )
+ pg_add.add_argument(
+ "-n",
+ "--name",
+ dest="gname",
+ type=partial(_group_name, exclude_old_names=False),
+ metavar="group-name",
+ required=True,
+ )
+ pg_add.add_argument(
+ "-p", "--path", dest="gpath", type=_path_name, metavar="group-path"
+ )
+
+ pg_rmrepo = group_cmds.add_parser(
+ "rmrepo", description="remove repo(s) from a group."
+ )
+ pg_rmrepo.add_argument(
+ "to_rm",
+ nargs="+",
+ metavar="repo",
+ choices=utils.get_repos(),
+ help="repo(s) to be removed from the group",
+ )
+ pg_rmrepo.add_argument(
+ "-n",
+ "--name",
+ dest="gname",
+ metavar="group-name",
+ required=True,
+ help="group name",
+ )
+ pg_rename = group_cmds.add_parser("rename", description="Change group name.")
+ pg_rename.add_argument(
+ "gname",
+ metavar="group-name",
+ choices=utils.get_groups(),
+ help="existing group to rename",
+ )
+ pg_rename.add_argument(
+ "new_name", metavar="new-name", type=_group_name, help="new group name"
+ )
+ group_cmds.add_parser("rm", description="Remove group(s).").add_argument(
+ "to_ungroup", nargs="+", choices=utils.get_groups(), help="group(s) to delete"
+ )
# superman mode
p_super = subparsers.add_parser(
- 'super',
- help='run any git command/alias',
- description='Superman mode: delegate any git command/alias in specified repo(s), group(s), or '
- 'all repo(s).\n'
+ "super",
+ help="run any git command/alias",
+ description="Superman mode: delegate any git command/alias in specified repo(s), group(s), or "
+ "all repo(s).\n"
'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n'
- '\t gita super repo1 repo2 repo3 checkout new-feature')
+ "\t gita super repo1 repo2 repo3 checkout new-feature",
+ )
p_super.add_argument(
- 'man',
+ "man",
nargs=argparse.REMAINDER,
help="execute arbitrary git command/alias for specified repo(s), group(s), or all repos.\n"
"Example: gita super repo1 diff --name-only --staged;\n"
- "gita super checkout master ")
+ "gita super checkout master ",
+ )
+ p_super.add_argument(
+ "-q", "--quote-mode", action="store_true", help="use quote mode"
+ )
p_super.set_defaults(func=f_super)
# shell mode
p_shell = subparsers.add_parser(
- 'shell',
- help='run any shell command',
- description='shell mode: delegate any shell command in specified repo(s), group(s), or '
- 'all repo(s).\n'
- 'Examples:\n \t gita shell pwd; \n'
- '\t gita shell repo1 repo2 repo3 touch xx')
+ "shell",
+ help="run any shell command",
+ description="shell mode: delegate any shell command in specified repo(s), group(s), or "
+ "all repo(s).\n"
+ "Examples:\n \t gita shell pwd; \n"
+ "\t gita shell repo1 repo2 repo3 touch xx",
+ )
p_shell.add_argument(
- 'man',
+ "man",
nargs=argparse.REMAINDER,
help="execute arbitrary shell command for specified repo(s), group(s), or all repos.\n"
"Example: gita shell myrepo1 ls\n"
- "Another: gita shell git checkout master ")
+ "Another: gita shell git checkout master ",
+ )
+ p_shell.add_argument(
+ "-q", "--quote-mode", action="store_true", help="use quote mode"
+ )
p_shell.set_defaults(func=f_shell)
+ # clear
+ p_clear = subparsers.add_parser(
+ "clear",
+ description="removes all groups and repositories",
+ help="removes all groups and repositories",
+ )
+ p_clear.set_defaults(func=f_clear)
+
# sub-commands that fit boilerplate
cmds = utils.get_cmds_from_files()
for name, data in cmds.items():
- help = data.get('help')
- cmd = data['cmd']
- if data.get('allow_all'):
+ help = data.get("help")
+ cmd = data["cmd"]
+ if data.get("allow_all"):
choices = utils.get_choices()
- nargs = '*'
- help += ' for all repos or'
+ nargs = "*"
+ help += " for all repos or"
else:
choices = utils.get_repos().keys() | utils.get_groups().keys()
- nargs = '+'
- help += ' for the chosen repo(s) or group(s)'
+ nargs = "+"
+ help += " for the chosen repo(s) or group(s)"
sp = subparsers.add_parser(name, description=help)
- sp.add_argument('repo', nargs=nargs, choices=choices, help=help)
- is_shell = bool(data.get('shell'))
- sp.add_argument('-s', '--shell', default=is_shell, type=bool,
- help='If set, run in shell mode')
+ sp.add_argument("repo", nargs=nargs, choices=choices, help=help)
+ is_shell = bool(data.get("shell"))
+ sp.add_argument(
+ "-s",
+ "--shell",
+ default=is_shell,
+ type=bool,
+ help="If set, run in shell mode",
+ )
if is_shell:
cmd = [cmd]
else:
@@ -648,14 +768,14 @@ def main(argv=None):
args = p.parse_args(argv)
args.async_blacklist = {
- name
- for name, data in cmds.items() if data.get('disable_async')
+ name for name, data in cmds.items() if data.get("disable_async")
}
- if 'func' in args:
+ if "func" in args:
args.func(args)
else:
p.print_help() # pragma: no cover
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main() # pragma: no cover
diff --git a/gita/cmds.json b/gita/cmds.json
index 947a4dd..eadda81 100644
--- a/gita/cmds.json
+++ b/gita/cmds.json
@@ -47,6 +47,7 @@
},
"push":{
"cmd": "git push",
+ "allow_all": true,
"help": "push the local updates"
},
"rebase":{
diff --git a/gita/utils.py b/gita/utils.py
index 8e7d9c4..5332255 100644
--- a/gita/utils.py
+++ b/gita/utils.py
@@ -279,6 +279,7 @@ def _make_name(path: str, repos: Dict[str, Dict[str, str]],
def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
include_bare=False,
exclude_submodule=False,
+ dry_run=False,
) -> Dict[str, Dict[str, str]]:
"""
Write new repo paths to file; return the added repos.
@@ -291,6 +292,10 @@ def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
new_repos = {}
if new_paths:
print(f"Found {len(new_paths)} new repo(s).")
+ if dry_run:
+ for p in new_paths:
+ print(p)
+ return {}
name_counts = Counter(
os.path.basename(os.path.normpath(p)) for p in new_paths
)
@@ -453,14 +458,14 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
return cmds
-def parse_repos_and_rest(input: List[str]
+def parse_repos_and_rest(input: List[str], quote_mode=False,
) -> Tuple[Dict[str, Dict[str, str]], List[str]]:
"""
Parse gita input arguments
@return: repos and the rest (e.g., gita shell and super commands)
"""
- i = None
+ i = 0
names = []
repos = get_repos()
groups = get_groups()
@@ -475,6 +480,10 @@ def parse_repos_and_rest(input: List[str]
i += 1
if not names and ctx:
names = [ctx.stem]
+ if quote_mode and i + 1 != len(input):
+ print(input[i], 'is not a repo or group' )
+ sys.exit(2)
+
if names:
chosen = {}
for k in names:
diff --git a/setup.py b/setup.py
index 4be31ed..97f3223 100644
--- a/setup.py
+++ b/setup.py
@@ -1,24 +1,24 @@
from setuptools import setup
long_description = None
-with open('README.md', encoding='utf-8') as f:
+with open("README.md", encoding="utf-8") as f:
long_description = f.read()
setup(
- name='gita',
- packages=['gita'],
- version='0.16.1',
- license='MIT',
- description='Manage multiple git repos with sanity',
+ name="gita",
+ packages=["gita"],
+ version="0.16.2",
+ license="MIT",
+ description="Manage multiple git repos with sanity",
long_description=long_description,
- long_description_content_type='text/markdown',
- url='https://github.com/nosarthur/gita',
- platforms=['linux', 'osx', 'win32'],
- keywords=['git', 'manage multiple repositories', 'cui', 'command-line'],
- author='Dong Zhou',
- author_email='zhou.dong@gmail.com',
- entry_points={'console_scripts': ['gita = gita.__main__:main']},
- python_requires='~=3.6',
+ long_description_content_type="text/markdown",
+ url="https://github.com/nosarthur/gita",
+ platforms=["linux", "osx", "win32"],
+ keywords=["git", "manage multiple repositories", "cui", "command-line"],
+ author="Dong Zhou",
+ author_email="zhou.dong@gmail.com",
+ entry_points={"console_scripts": ["gita = gita.__main__:main"]},
+ python_requires="~=3.6",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
diff --git a/tests/test_main.py b/tests/test_main.py
index 3941ea9..e36c2d8 100644
--- a/tests/test_main.py
+++ b/tests/test_main.py
@@ -1,4 +1,3 @@
-import os
import pytest
from unittest.mock import patch
from pathlib import Path
@@ -7,312 +6,368 @@ import asyncio
import shlex
from gita import __main__
-from gita import utils, info, common
+from gita import utils, info
from conftest import (
- PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME,
- async_mock, TEST_DIR,
+ PATH_FNAME,
+ PATH_FNAME_EMPTY,
+ PATH_FNAME_CLASH,
+ GROUP_FNAME,
+ async_mock,
+ TEST_DIR,
)
-@patch('gita.utils.get_repos', return_value={'aa'})
+@patch("gita.utils.get_repos", return_value={"aa"})
def test_group_name(_):
- got = __main__._group_name('xx')
- assert got == 'xx'
+ got = __main__._group_name("xx")
+ assert got == "xx"
with pytest.raises(SystemExit):
- __main__._group_name('aa')
+ __main__._group_name("aa")
class TestAdd:
-
- @pytest.mark.parametrize('input, expected', [
- (['add', '.'], ''),
- ])
- @patch('gita.common.get_config_fname')
+ @pytest.mark.parametrize(
+ "input, expected",
+ [
+ (["add", "."], ""),
+ ],
+ )
+ @patch("gita.common.get_config_fname")
def test_add(self, mock_path_fname, tmp_path, input, expected):
def side_effect(input, _=None):
- return tmp_path / f'{input}.txt'
+ return tmp_path / f"{input}.txt"
+
mock_path_fname.side_effect = side_effect
utils.get_repos.cache_clear()
__main__.main(input)
utils.get_repos.cache_clear()
got = utils.get_repos()
assert len(got) == 1
- assert got['gita']['type'] == expected
+ assert got["gita"]["type"] == expected
-@pytest.mark.parametrize('path_fname, expected', [
- (PATH_FNAME, ''),
- (PATH_FNAME_CLASH, "repo2: ['--haha', '--pp']\n"),
- ])
-@patch('gita.utils.is_git', return_value=True)
-@patch('gita.utils.get_groups', return_value={})
-@patch('gita.common.get_config_fname')
+@pytest.mark.parametrize(
+ "path_fname, expected",
+ [
+ (PATH_FNAME, ""),
+ (PATH_FNAME_CLASH, "repo2: ['--haha', '--pp']\n"),
+ ],
+)
+@patch("gita.utils.is_git", return_value=True)
+@patch("gita.utils.get_groups", return_value={})
+@patch("gita.common.get_config_fname")
def test_flags(mock_path_fname, _, __, path_fname, expected, capfd):
mock_path_fname.return_value = path_fname
utils.get_repos.cache_clear()
- __main__.main(['flags'])
+ __main__.main(["flags"])
out, err = capfd.readouterr()
- assert err == ''
+ assert err == ""
assert out == expected
class TestLsLl:
- @patch('gita.common.get_config_fname')
+ @patch("gita.common.get_config_fname")
def test_ll(self, mock_path_fname, capfd, tmp_path):
"""
functional test
"""
# avoid modifying the local configuration
def side_effect(input, _=None):
- return tmp_path / f'{input}.txt'
+ return tmp_path / f"{input}.txt"
+
mock_path_fname.side_effect = side_effect
utils.get_repos.cache_clear()
- __main__.main(['add', '.'])
+ __main__.main(["add", "."])
out, err = capfd.readouterr()
- assert err == ''
- assert 'Found 1 new repo(s).\n' == out
+ assert err == ""
+ assert "Found 1 new repo(s).\n" == out
# in production this is not needed
utils.get_repos.cache_clear()
- __main__.main(['ls'])
+ __main__.main(["ls"])
out, err = capfd.readouterr()
- assert err == ''
- assert 'gita\n' == out
+ assert err == ""
+ assert "gita\n" == out
- __main__.main(['ll'])
+ __main__.main(["ll"])
out, err = capfd.readouterr()
- assert err == ''
- assert 'gita' in out
+ assert err == ""
+ assert "gita" in out
assert info.Color.end in out
# no color on branch name
- __main__.main(['ll', '-C'])
+ __main__.main(["ll", "-C"])
out, err = capfd.readouterr()
- assert err == ''
- assert 'gita' in out
+ assert err == ""
+ assert "gita" in out
assert info.Color.end not in out
- __main__.main(['ls', 'gita'])
+ __main__.main(["ls", "gita"])
out, err = capfd.readouterr()
- assert err == ''
- assert out.strip() == utils.get_repos()['gita']['path']
+ assert err == ""
+ assert out.strip() == utils.get_repos()["gita"]["path"]
def test_ls(self, monkeypatch, capfd):
- monkeypatch.setattr(utils, 'get_repos',
- lambda: {'repo1': {'path': '/a/'}, 'repo2': {'path': '/b/'}})
- monkeypatch.setattr(utils, 'describe', lambda x: x)
- __main__.main(['ls'])
+ monkeypatch.setattr(
+ utils,
+ "get_repos",
+ lambda: {"repo1": {"path": "/a/"}, "repo2": {"path": "/b/"}},
+ )
+ monkeypatch.setattr(utils, "describe", lambda x: x)
+ __main__.main(["ls"])
out, err = capfd.readouterr()
- assert err == ''
+ assert err == ""
assert out == "repo1 repo2\n"
- __main__.main(['ls', 'repo1'])
+ __main__.main(["ls", "repo1"])
out, err = capfd.readouterr()
- assert err == ''
- assert out == '/a/\n'
-
- @pytest.mark.parametrize('path_fname, expected', [
- (PATH_FNAME,
- "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \nxxx cmaster dsu\x1b[0m msg \n"),
- (PATH_FNAME_EMPTY, ""),
- (PATH_FNAME_CLASH,
- "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \n"
- ),
- ])
- @patch('gita.utils.is_git', return_value=True)
- @patch('gita.info.get_head', return_value="master")
- @patch('gita.info._get_repo_status', return_value=("d", "s", "u", "c"))
- @patch('gita.info.get_commit_msg', return_value="msg")
- @patch('gita.info.get_commit_time', return_value="")
- @patch('gita.common.get_config_fname')
- def test_with_path_files(self, mock_path_fname, _0, _1, _2, _3, _4, path_fname,
- expected, capfd):
+ assert err == ""
+ assert out == "/a/\n"
+
+ @pytest.mark.parametrize(
+ "path_fname, expected",
+ [
+ (
+ PATH_FNAME,
+ "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \nxxx cmaster dsu\x1b[0m msg \n",
+ ),
+ (PATH_FNAME_EMPTY, ""),
+ (
+ PATH_FNAME_CLASH,
+ "repo1 cmaster dsu\x1b[0m msg \nrepo2 cmaster dsu\x1b[0m msg \n",
+ ),
+ ],
+ )
+ @patch("gita.utils.is_git", return_value=True)
+ @patch("gita.info.get_head", return_value="master")
+ @patch("gita.info._get_repo_status", return_value=("d", "s", "u", "c"))
+ @patch("gita.info.get_commit_msg", return_value="msg")
+ @patch("gita.info.get_commit_time", return_value="")
+ @patch("gita.common.get_config_fname")
+ def test_with_path_files(
+ self, mock_path_fname, _0, _1, _2, _3, _4, path_fname, expected, capfd
+ ):
def side_effect(input, _=None):
- if input == 'repos.csv':
+ if input == "repos.csv":
return path_fname
- return f'/{input}'
+ return f"/{input}"
+
mock_path_fname.side_effect = side_effect
utils.get_repos.cache_clear()
- __main__.main(['ll'])
+ __main__.main(["ll"])
out, err = capfd.readouterr()
print(out)
- assert err == ''
+ assert err == ""
assert out == expected
-@pytest.mark.parametrize('input, expected', [
- ({'repo1': {'path': '/a/'}, 'repo2': {'path': '/b/'}}, ''),
- ])
-@patch('subprocess.run')
-@patch('gita.utils.get_repos')
+@pytest.mark.parametrize(
+ "input, expected",
+ [
+ ({"repo1": {"path": "/a/"}, "repo2": {"path": "/b/"}}, ""),
+ ],
+)
+@patch("subprocess.run")
+@patch("gita.utils.get_repos")
def test_freeze(mock_repos, mock_run, input, expected, capfd):
mock_repos.return_value = input
- __main__.main(['freeze'])
+ __main__.main(["freeze"])
assert mock_run.call_count == 2
out, err = capfd.readouterr()
- assert err == ''
+ assert err == ""
assert out == expected
-@patch('gita.utils.parse_clone_config', return_value=[
- ['git@github.com:user/repo.git', 'repo', '/a/repo']])
-@patch('gita.utils.run_async', new=async_mock())
-@patch('subprocess.run')
-def test_clone(*_):
- asyncio.set_event_loop(asyncio.new_event_loop())
+@patch("subprocess.run")
+def test_clone_with_url(mock_run):
args = argparse.Namespace()
- args.fname = ['freeze_filename']
+ args.clonee = "http://abc.com/repo1"
args.preserve_path = None
+ args.directory = "/home/xxx"
+ args.from_file = False
+ __main__.f_clone(args)
+ cmds = ["git", "clone", args.clonee]
+ mock_run.assert_called_once_with(cmds, cwd=args.directory)
+
+
+@patch(
+ "gita.utils.parse_clone_config",
+ return_value=[["git@github.com:user/repo.git", "repo", "/a/repo"]],
+)
+@patch("gita.utils.run_async", new=async_mock())
+@patch("subprocess.run")
+def test_clone_with_config_file(*_):
+ asyncio.set_event_loop(asyncio.new_event_loop())
+ args = argparse.Namespace()
+ args.clonee = "freeze_filename"
+ args.preserve_path = False
+ args.directory = None
+ args.from_file = True
__main__.f_clone(args)
mock_run = utils.run_async.mock
assert mock_run.call_count == 1
- cmds = ['git', 'clone', 'git@github.com:user/repo.git']
- mock_run.assert_called_once_with('repo', Path.cwd(), cmds)
+ cmds = ["git", "clone", "git@github.com:user/repo.git"]
+ mock_run.assert_called_once_with("repo", Path.cwd(), cmds)
-@patch('gita.utils.parse_clone_config', return_value=[
- ['git@github.com:user/repo.git', 'repo', '/a/repo']])
-@patch('gita.utils.run_async', new=async_mock())
-@patch('subprocess.run')
+@patch(
+ "gita.utils.parse_clone_config",
+ return_value=[["git@github.com:user/repo.git", "repo", "/a/repo"]],
+)
+@patch("gita.utils.run_async", new=async_mock())
+@patch("subprocess.run")
def test_clone_with_preserve_path(*_):
asyncio.set_event_loop(asyncio.new_event_loop())
args = argparse.Namespace()
- args.fname = ['freeze_filename']
+ args.clonee = "freeze_filename"
+ args.directory = None
+ args.from_file = True
args.preserve_path = True
__main__.f_clone(args)
mock_run = utils.run_async.mock
assert mock_run.call_count == 1
- cmds = ['git', 'clone', 'git@github.com:user/repo.git', '/a/repo']
- mock_run.assert_called_once_with('repo', Path.cwd(), cmds)
+ cmds = ["git", "clone", "git@github.com:user/repo.git", "/a/repo"]
+ mock_run.assert_called_once_with("repo", Path.cwd(), cmds)
-@patch('os.makedirs')
-@patch('os.path.isfile', return_value=True)
-@patch('gita.common.get_config_fname', return_value='some path')
-@patch('gita.utils.get_repos', return_value={'repo1': {'path': '/a/', 'type': ''},
- 'repo2': {'path': '/b/', 'type': ''}})
-@patch('gita.utils.write_to_repo_file')
+@patch("os.makedirs")
+@patch("os.path.isfile", return_value=True)
+@patch("gita.common.get_config_fname", return_value="some path")
+@patch(
+ "gita.utils.get_repos",
+ return_value={
+ "repo1": {"path": "/a/", "type": ""},
+ "repo2": {"path": "/b/", "type": ""},
+ },
+)
+@patch("gita.utils.write_to_repo_file")
def test_rm(mock_write, *_):
args = argparse.Namespace()
- args.repo = ['repo1']
+ args.repo = ["repo1"]
__main__.f_rm(args)
- mock_write.assert_called_once_with(
- {'repo2': {'path': '/b/', 'type': ''}}, 'w')
+ mock_write.assert_called_once_with({"repo2": {"path": "/b/", "type": ""}}, "w")
def test_not_add():
# this won't write to disk because the repo is not valid
- __main__.main(['add', '/home/some/repo/'])
+ __main__.main(["add", "/home/some/repo/"])
-@patch('gita.utils.get_repos', return_value={'repo2': {'path': '/d/efg',
- 'flags': []}})
-@patch('subprocess.run')
+@patch("gita.utils.get_repos", return_value={"repo2": {"path": "/d/efg", "flags": []}})
+@patch("subprocess.run")
def test_fetch(mock_run, *_):
asyncio.set_event_loop(asyncio.new_event_loop())
- __main__.main(['fetch'])
- mock_run.assert_called_once_with(['git', 'fetch'], cwd='/d/efg', shell=False)
+ __main__.main(["fetch"])
+ mock_run.assert_called_once_with(["git", "fetch"], cwd="/d/efg", shell=False)
@patch(
- 'gita.utils.get_repos', return_value={
- 'repo1': {'path': '/a/bc', 'flags': []},
- 'repo2': {'path': '/d/efg', 'flags': []}
- })
-@patch('gita.utils.run_async', new=async_mock())
-@patch('subprocess.run')
+ "gita.utils.get_repos",
+ return_value={
+ "repo1": {"path": "/a/bc", "flags": []},
+ "repo2": {"path": "/d/efg", "flags": []},
+ },
+)
+@patch("gita.utils.run_async", new=async_mock())
+@patch("subprocess.run")
def test_async_fetch(*_):
- __main__.main(['fetch'])
+ __main__.main(["fetch"])
mock_run = utils.run_async.mock
assert mock_run.call_count == 2
- cmds = ['git', 'fetch']
+ cmds = ["git", "fetch"]
# print(mock_run.call_args_list)
- mock_run.assert_any_call('repo1', '/a/bc', cmds)
- mock_run.assert_any_call('repo2', '/d/efg', cmds)
+ mock_run.assert_any_call("repo1", "/a/bc", cmds)
+ mock_run.assert_any_call("repo2", "/d/efg", cmds)
-@pytest.mark.parametrize('input', [
- 'diff --name-only --staged',
- "commit -am 'lala kaka'",
-])
-@patch('gita.utils.get_repos', return_value={'repo7': {'path': 'path7', 'flags': []}})
-@patch('subprocess.run')
+@pytest.mark.parametrize(
+ "input",
+ [
+ "diff --name-only --staged",
+ "commit -am 'lala kaka'",
+ ],
+)
+@patch("gita.utils.get_repos", return_value={"repo7": {"path": "path7", "flags": []}})
+@patch("subprocess.run")
def test_superman(mock_run, _, input):
mock_run.reset_mock()
- args = ['super', 'repo7'] + shlex.split(input)
+ args = ["super", "repo7"] + shlex.split(input)
__main__.main(args)
- expected_cmds = ['git'] + shlex.split(input)
- mock_run.assert_called_once_with(expected_cmds, cwd='path7', shell=False)
+ expected_cmds = ["git"] + shlex.split(input)
+ mock_run.assert_called_once_with(expected_cmds, cwd="path7", shell=False)
-@pytest.mark.parametrize('input', [
- 'diff --name-only --staged',
- "commit -am 'lala kaka'",
-])
-@patch('gita.utils.get_repos', return_value={'repo7': {'path': 'path7', 'flags': []}})
-@patch('subprocess.run')
+@pytest.mark.parametrize(
+ "input",
+ [
+ "diff --name-only --staged",
+ "commit -am 'lala kaka'",
+ ],
+)
+@patch("gita.utils.get_repos", return_value={"repo7": {"path": "path7", "flags": []}})
+@patch("subprocess.run")
def test_shell(mock_run, _, input):
mock_run.reset_mock()
- args = ['shell', 'repo7', input]
+ args = ["shell", "repo7", input]
__main__.main(args)
expected_cmds = input
- mock_run.assert_called_once_with(expected_cmds, cwd='path7', shell=True, stderr=-2, stdout=-1)
+ mock_run.assert_called_once_with(
+ expected_cmds, cwd="path7", shell=True, stderr=-2, stdout=-1
+ )
class TestContext:
-
- @patch('gita.utils.get_context', return_value=None)
+ @patch("gita.utils.get_context", return_value=None)
def test_display_no_context(self, _, capfd):
- __main__.main(['context'])
+ __main__.main(["context"])
out, err = capfd.readouterr()
- assert err == ''
- assert 'Context is not set\n' == out
+ assert err == ""
+ assert "Context is not set\n" == out
- @patch('gita.utils.get_context', return_value=Path('gname.context'))
- @patch('gita.utils.get_groups', return_value={'gname': {'repos': ['a', 'b']}})
+ @patch("gita.utils.get_context", return_value=Path("gname.context"))
+ @patch("gita.utils.get_groups", return_value={"gname": {"repos": ["a", "b"]}})
def test_display_context(self, _, __, capfd):
- __main__.main(['context'])
+ __main__.main(["context"])
out, err = capfd.readouterr()
- assert err == ''
- assert 'gname: a b\n' == out
+ assert err == ""
+ assert "gname: a b\n" == out
- @patch('gita.utils.get_context')
+ @patch("gita.utils.get_context")
def test_reset(self, mock_ctx):
- __main__.main(['context', 'none'])
+ __main__.main(["context", "none"])
mock_ctx.return_value.unlink.assert_called()
- @patch('gita.utils.get_context', return_value=None)
- @patch('gita.common.get_config_dir', return_value=TEST_DIR)
- @patch('gita.utils.get_groups', return_value={'lala': ['b'], 'kaka': []})
+ @patch("gita.utils.get_context", return_value=None)
+ @patch("gita.common.get_config_dir", return_value=TEST_DIR)
+ @patch("gita.utils.get_groups", return_value={"lala": ["b"], "kaka": []})
def test_set_first_time(self, *_):
- ctx = TEST_DIR / 'lala.context'
+ ctx = TEST_DIR / "lala.context"
assert not ctx.is_file()
- __main__.main(['context', 'lala'])
+ __main__.main(["context", "lala"])
assert ctx.is_file()
ctx.unlink()
- @patch('gita.common.get_config_dir', return_value=TEST_DIR)
- @patch('gita.utils.get_groups', return_value={'lala': ['b'], 'kaka': []})
- @patch('gita.utils.get_context')
+ @patch("gita.common.get_config_dir", return_value=TEST_DIR)
+ @patch("gita.utils.get_groups", return_value={"lala": ["b"], "kaka": []})
+ @patch("gita.utils.get_context")
def test_set_second_time(self, mock_ctx, *_):
- __main__.main(['context', 'kaka'])
+ __main__.main(["context", "kaka"])
mock_ctx.return_value.rename.assert_called()
class TestGroupCmd:
-
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
def test_ls(self, _, capfd):
args = argparse.Namespace()
args.to_group = None
- args.group_cmd = 'ls'
+ args.group_cmd = "ls"
utils.get_groups.cache_clear()
__main__.f_group(args)
out, err = capfd.readouterr()
- assert err == ''
- assert 'xx yy\n' == out
+ assert err == ""
+ assert "xx yy\n" == out
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
def test_ll(self, _, capfd):
args = argparse.Namespace()
args.to_group = None
@@ -321,175 +376,228 @@ class TestGroupCmd:
utils.get_groups.cache_clear()
__main__.f_group(args)
out, err = capfd.readouterr()
- assert err == ''
- assert out == '\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n'
+ assert err == ""
+ assert (
+ out
+ == "\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n"
+ )
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
def test_ll_with_group(self, _, capfd):
args = argparse.Namespace()
args.to_group = None
args.group_cmd = None
- args.to_show = 'yy'
+ args.to_show = "yy"
utils.get_groups.cache_clear()
__main__.f_group(args)
out, err = capfd.readouterr()
- assert err == ''
- assert 'a c d\n' == out
+ assert err == ""
+ assert "a c d\n" == out
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
- @patch('gita.utils.write_to_groups_file')
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
+ @patch("gita.utils.write_to_groups_file")
def test_rename(self, mock_write, _):
args = argparse.Namespace()
- args.gname = 'xx'
- args.new_name = 'zz'
- args.group_cmd = 'rename'
+ args.gname = "xx"
+ args.new_name = "zz"
+ args.group_cmd = "rename"
utils.get_groups.cache_clear()
__main__.f_group(args)
- expected = {'yy': {'repos': ['a', 'c', 'd'], 'path': ''},
- 'zz': {'repos': ['a', 'b'], 'path': ''}}
- mock_write.assert_called_once_with(expected, 'w')
-
- @patch('gita.info.get_color_encoding', return_value=info.default_colors)
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
+ expected = {
+ "yy": {"repos": ["a", "c", "d"], "path": ""},
+ "zz": {"repos": ["a", "b"], "path": ""},
+ }
+ mock_write.assert_called_once_with(expected, "w")
+
+ @patch("gita.info.get_color_encoding", return_value=info.default_colors)
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
def test_rename_error(self, *_):
utils.get_groups.cache_clear()
with pytest.raises(SystemExit, match="1"):
- __main__.main('group rename xx yy'.split())
-
- @pytest.mark.parametrize('input, expected', [
- ('xx', {'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}),
- ("xx yy", {}),
- ])
- @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''})
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
- @patch('gita.utils.write_to_groups_file')
+ __main__.main("group rename xx yy".split())
+
+ @pytest.mark.parametrize(
+ "input, expected",
+ [
+ ("xx", {"yy": {"repos": ["a", "c", "d"], "path": ""}}),
+ ("xx yy", {}),
+ ],
+ )
+ @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
+ @patch("gita.utils.write_to_groups_file")
def test_rm(self, mock_write, _, __, input, expected):
utils.get_groups.cache_clear()
- args = ['group', 'rm'] + shlex.split(input)
+ args = ["group", "rm"] + shlex.split(input)
__main__.main(args)
- mock_write.assert_called_once_with(expected, 'w')
+ mock_write.assert_called_once_with(expected, "w")
- @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''})
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
- @patch('gita.utils.write_to_groups_file')
+ @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
+ @patch("gita.utils.write_to_groups_file")
def test_add(self, mock_write, *_):
args = argparse.Namespace()
- args.to_group = ['a', 'c']
- args.group_cmd = 'add'
- args.gname = 'zz'
+ args.to_group = ["a", "c"]
+ args.group_cmd = "add"
+ args.gname = "zz"
utils.get_groups.cache_clear()
__main__.f_group(args)
mock_write.assert_called_once_with(
- {'zz': {'repos': ['a', 'c'], 'path': ''}}, 'a+')
+ {"zz": {"repos": ["a", "c"], "path": ""}}, "a+"
+ )
- @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''})
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
- @patch('gita.utils.write_to_groups_file')
+ @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
+ @patch("gita.utils.write_to_groups_file")
def test_add_to_existing(self, mock_write, *_):
args = argparse.Namespace()
- args.to_group = ['a', 'c']
- args.group_cmd = 'add'
- args.gname = 'xx'
+ args.to_group = ["a", "c"]
+ args.group_cmd = "add"
+ args.gname = "xx"
utils.get_groups.cache_clear()
__main__.f_group(args)
mock_write.assert_called_once_with(
- {'xx': {'repos': ['a', 'b', 'c'], 'path': ''},
- 'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}, 'w')
-
- @patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''})
- @patch('gita.common.get_config_fname', return_value=GROUP_FNAME)
- @patch('gita.utils.write_to_groups_file')
+ {
+ "xx": {"repos": ["a", "b", "c"], "path": ""},
+ "yy": {"repos": ["a", "c", "d"], "path": ""},
+ },
+ "w",
+ )
+
+ @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
+ @patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
+ @patch("gita.utils.write_to_groups_file")
def test_rm_repo(self, mock_write, *_):
args = argparse.Namespace()
- args.to_rm = ['a', 'c']
- args.group_cmd = 'rmrepo'
- args.gname = 'xx'
+ args.to_rm = ["a", "c"]
+ args.group_cmd = "rmrepo"
+ args.gname = "xx"
utils.get_groups.cache_clear()
__main__.f_group(args)
mock_write.assert_called_once_with(
- {'xx': {'repos': ['b'], 'path': ''},
- 'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}, 'w')
-
- @patch('gita.common.get_config_fname')
+ {
+ "xx": {"repos": ["b"], "path": ""},
+ "yy": {"repos": ["a", "c", "d"], "path": ""},
+ },
+ "w",
+ )
+
+ @patch("gita.common.get_config_fname")
def test_integration(self, mock_path_fname, tmp_path, capfd):
def side_effect(input, _=None):
- return tmp_path / f'{input}.csv'
+ return tmp_path / f"{input}.csv"
+
mock_path_fname.side_effect = side_effect
- __main__.main('add .'.split())
+ __main__.main("add .".split())
utils.get_repos.cache_clear()
- __main__.main('group add gita -n test'.split())
+ __main__.main("group add gita -n test".split())
utils.get_groups.cache_clear()
- __main__.main('ll test'.split())
+ __main__.main("ll test".split())
out, err = capfd.readouterr()
- assert err == ''
- assert 'gita' in out
+ assert err == ""
+ assert "gita" in out
-@patch('gita.utils.is_git', return_value=True)
-@patch('gita.common.get_config_fname', return_value=PATH_FNAME)
-@patch('gita.utils.rename_repo')
+@patch("gita.utils.is_git", return_value=True)
+@patch("gita.common.get_config_fname", return_value=PATH_FNAME)
+@patch("gita.utils.rename_repo")
def test_rename(mock_rename, _, __):
utils.get_repos.cache_clear()
- args = ['rename', 'repo1', 'abc']
+ args = ["rename", "repo1", "abc"]
__main__.main(args)
mock_rename.assert_called_once_with(
- {'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []},
- 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []},
- 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}},
- 'repo1', 'abc')
+ {
+ "repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []},
+ "xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []},
+ "repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []},
+ },
+ "repo1",
+ "abc",
+ )
class TestInfo:
-
- @patch('gita.common.get_config_fname', return_value='')
+ @patch("gita.common.get_config_fname", return_value="")
def test_ll(self, _, capfd):
args = argparse.Namespace()
args.info_cmd = None
__main__.f_info(args)
out, err = capfd.readouterr()
- assert 'In use: branch,commit_msg,commit_time\nUnused: path\n' == out
- assert err == ''
+ assert "In use: branch,commit_msg,commit_time\nUnused: path\n" == out
+ assert err == ""
- @patch('gita.common.get_config_fname')
+ @patch("gita.common.get_config_fname")
def test_add(self, mock_get_fname, tmpdir):
args = argparse.Namespace()
- args.info_cmd = 'add'
- args.info_item = 'path'
+ args.info_cmd = "add"
+ args.info_item = "path"
with tmpdir.as_cwd():
- csv_config = Path.cwd() / 'info.csv'
+ csv_config = Path.cwd() / "info.csv"
mock_get_fname.return_value = csv_config
__main__.f_info(args)
items = info.get_info_items()
- assert items == ['branch', 'commit_msg', 'commit_time', 'path']
+ assert items == ["branch", "commit_msg", "commit_time", "path"]
- @patch('gita.common.get_config_fname')
+ @patch("gita.common.get_config_fname")
def test_rm(self, mock_get_fname, tmpdir):
args = argparse.Namespace()
- args.info_cmd = 'rm'
- args.info_item = 'commit_msg'
+ args.info_cmd = "rm"
+ args.info_item = "commit_msg"
with tmpdir.as_cwd():
- csv_config = Path.cwd() / 'info.csv'
+ csv_config = Path.cwd() / "info.csv"
mock_get_fname.return_value = csv_config
__main__.f_info(args)
items = info.get_info_items()
- assert items == ['branch', 'commit_time']
+ assert items == ["branch", "commit_time"]
-@patch('gita.common.get_config_fname')
+@patch("gita.common.get_config_fname")
def test_set_color(mock_get_fname, tmpdir):
- args = argparse.Namespace()
- args.color_cmd = 'set'
- args.color = 'b_white'
- args.situation = 'no-remote'
- with tmpdir.as_cwd():
- csv_config = Path.cwd() / 'colors.csv'
- mock_get_fname.return_value = csv_config
- __main__.f_color(args)
+ args = argparse.Namespace()
+ args.color_cmd = "set"
+ args.color = "b_white"
+ args.situation = "no-remote"
+ with tmpdir.as_cwd():
+ csv_config = Path.cwd() / "colors.csv"
+ mock_get_fname.return_value = csv_config
+ __main__.f_color(args)
- info.get_color_encoding.cache_clear() # avoid side effect
- items = info.get_color_encoding()
info.get_color_encoding.cache_clear() # avoid side effect
- assert items == {'no-remote': 'b_white', 'in-sync': 'green',
- 'diverged': 'red', 'local-ahead': 'purple',
- 'remote-ahead': 'yellow'}
+ items = info.get_color_encoding()
+ info.get_color_encoding.cache_clear() # avoid side effect
+ assert items == {
+ "no-remote": "b_white",
+ "in-sync": "green",
+ "diverged": "red",
+ "local-ahead": "purple",
+ "remote-ahead": "yellow",
+ }
+
+
+@pytest.mark.parametrize(
+ "input, expected",
+ [
+ ({"repo1": {"path": "/a/"}, "repo2": {"path": "/b/"}}, ""),
+ ],
+)
+@patch("gita.utils.write_to_groups_file")
+@patch("gita.utils.write_to_repo_file")
+@patch("gita.utils.get_repos")
+def test_clear(
+ mock_repos,
+ mock_write_to_repo_file,
+ mock_write_to_groups_file,
+ input,
+ expected,
+ capfd,
+):
+ mock_repos.return_value = input
+ __main__.main(["clear"])
+ assert mock_write_to_repo_file.call_count == 1
+ mock_write_to_repo_file.assert_called_once_with({}, "w")
+ assert mock_write_to_groups_file.call_count == 1
+ mock_write_to_groups_file.assert_called_once_with({}, "w")
+ out, err = capfd.readouterr()
+ assert err == ""
+ assert out == expected