summaryrefslogtreecommitdiffstats
path: root/gita
diff options
context:
space:
mode:
Diffstat (limited to 'gita')
-rw-r--r--gita/__init__.py2
-rw-r--r--gita/__main__.py825
-rw-r--r--gita/cmds.json90
-rw-r--r--gita/cmds.yml65
-rw-r--r--gita/common.py17
-rw-r--r--gita/info.py293
-rw-r--r--gita/utils.py440
7 files changed, 1358 insertions, 374 deletions
diff --git a/gita/__init__.py b/gita/__init__.py
index eeb79a3..33c0f41 100644
--- a/gita/__init__.py
+++ b/gita/__init__.py
@@ -1,3 +1,3 @@
import pkg_resources
-__version__ = pkg_resources.get_distribution('gita').version
+__version__ = pkg_resources.get_distribution("gita").version
diff --git a/gita/__main__.py b/gita/__main__.py
index 6e47f8e..b2bc32b 100644
--- a/gita/__main__.py
+++ b/gita/__main__.py
@@ -1,4 +1,4 @@
-'''
+"""
Gita manages multiple git repos. It has two functionalities
1. display the status of multiple repos side by side
@@ -12,19 +12,82 @@ Examples:
For bash auto completion, download and source
https://github.com/nosarthur/gita/blob/master/.gita-completion.bash
-'''
+"""
import os
+import sys
+import csv
import argparse
import subprocess
+from functools import partial
import pkg_resources
+from itertools import chain
+from pathlib import Path
+import glob
-from . import utils, info
+from . import utils, info, common
+
+
+def _group_name(name: str, exclude_old_names=True) -> str:
+ """
+ Return valid group name
+ """
+ repos = utils.get_repos()
+ if name in repos:
+ print(f"Cannot use group name {name} since it's a repo name.")
+ sys.exit(1)
+ if exclude_old_names:
+ if name in utils.get_groups():
+ print(f"Cannot use group name {name} since it's already in use.")
+ sys.exit(1)
+ if name in {"none", "auto"}:
+ print(f"Cannot use group name {name} since it's a reserved keyword.")
+ sys.exit(1)
+ return name
+
+
+def _path_name(name: str) -> str:
+ """
+ Return absolute path
+ """
+ if name:
+ return os.path.abspath(name)
+ return ""
def f_add(args: argparse.Namespace):
repos = utils.get_repos()
- utils.add_repos(repos, args.paths)
+ paths = args.paths
+ dry_run = args.dry_run
+ groups = utils.get_groups()
+ if args.recursive or args.auto_group:
+ paths = (
+ p.rstrip(os.path.sep)
+ for p in chain.from_iterable(
+ glob.glob(os.path.join(p, "**/"), recursive=True) for p in args.paths
+ )
+ )
+ new_repos = utils.add_repos(
+ repos,
+ paths,
+ include_bare=args.bare,
+ exclude_submodule=args.skip_submodule,
+ dry_run=dry_run,
+ )
+ if dry_run:
+ return
+ if new_repos and args.auto_group:
+ new_groups = utils.auto_group(new_repos, args.paths)
+ if new_groups:
+ print(f"Created {len(new_groups)} new group(s).")
+ utils.write_to_groups_file(new_groups, "a+")
+ if new_repos and args.group:
+ gname = args.group
+ gname_repos = set(groups[gname]["repos"])
+ gname_repos.update(new_repos)
+ groups[gname]["repos"] = sorted(gname_repos)
+ print(f"Added {len(new_repos)} repos to the {gname} group")
+ utils.write_to_groups_file(groups, "w")
def f_rename(args: argparse.Namespace):
@@ -32,12 +95,123 @@ def f_rename(args: argparse.Namespace):
utils.rename_repo(repos, args.repo[0], args.new_name)
-def f_info(_):
- all_items, to_display = info.get_info_items()
- print('In use:', ','.join(to_display))
- unused = set(all_items) - set(to_display)
- if unused:
- print('Unused:', ' '.join(unused))
+def f_flags(args: argparse.Namespace):
+ cmd = args.flags_cmd or "ll"
+ repos = utils.get_repos()
+ if cmd == "ll":
+ for r, prop in repos.items():
+ if prop["flags"]:
+ print(f"{r}: {prop['flags']}")
+ elif cmd == "set":
+ # when in memory, flags are List[str], when on disk, they are space
+ # delimited str
+ repos[args.repo]["flags"] = args.flags
+ utils.write_to_repo_file(repos, "w")
+
+
+def f_color(args: argparse.Namespace):
+ cmd = args.color_cmd or "ll"
+ if cmd == "ll": # pragma: no cover
+ info.show_colors()
+ elif cmd == "set":
+ colors = info.get_color_encoding()
+ colors[args.situation] = args.color
+ csv_config = common.get_config_fname("color.csv")
+ with open(csv_config, "w", newline="") as f:
+ writer = csv.DictWriter(f, fieldnames=colors)
+ writer.writeheader()
+ writer.writerow(colors)
+ elif cmd == "reset":
+ Path(common.get_config_fname("color.csv")).unlink(missing_ok=True)
+
+
+def f_info(args: argparse.Namespace):
+ to_display = info.get_info_items()
+ cmd = args.info_cmd or "ll"
+ if cmd == "ll":
+ print("In use:", ",".join(to_display))
+ unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display)))
+ if unused:
+ print("Unused:", ",".join(unused))
+ return
+ if cmd == "add" and args.info_item not in to_display:
+ to_display.append(args.info_item)
+ csv_config = common.get_config_fname("info.csv")
+ with open(csv_config, "w", newline="") as f:
+ writer = csv.writer(f)
+ writer.writerow(to_display)
+ elif cmd == "rm" and args.info_item in to_display:
+ to_display.remove(args.info_item)
+ csv_config = common.get_config_fname("info.csv")
+ with open(csv_config, "w", newline="") as f:
+ writer = csv.writer(f)
+ writer.writerow(to_display)
+
+
+def f_clone(args: argparse.Namespace):
+
+ if args.dry_run:
+ if args.from_file:
+ for url, repo_name, abs_path in utils.parse_clone_config(args.clonee):
+ print(f"git clone {url} {abs_path}")
+ else:
+ print(f"git clone {args.clonee}")
+ return
+
+ if args.directory:
+ path = args.directory
+ else:
+ path = Path.cwd()
+
+ if not args.from_file:
+ subprocess.run(["git", "clone", args.clonee], cwd=path)
+ # add the cloned repo to gita; group is also supported
+ cloned_path = os.path.join(path, args.clonee.split("/")[-1].split(".")[0])
+ args.paths = [cloned_path]
+ args.recursive = args.auto_group = args.bare = args.skip_submodule = False
+ f_add(args)
+ return
+
+ if args.preserve_path:
+ utils.exec_async_tasks(
+ utils.run_async(repo_name, path, ["git", "clone", url, abs_path])
+ for url, repo_name, abs_path in utils.parse_clone_config(args.clonee)
+ )
+ else:
+ utils.exec_async_tasks(
+ utils.run_async(repo_name, path, ["git", "clone", url])
+ for url, repo_name, _ in utils.parse_clone_config(args.clonee)
+ )
+
+
+def f_freeze(args):
+ repos = utils.get_repos()
+ ctx = utils.get_context()
+ if args.group is None and ctx:
+ args.group = ctx.stem
+ group_repos = None
+ if args.group: # only display repos in this group
+ group_repos = utils.get_groups()[args.group]["repos"]
+ repos = {k: repos[k] for k in group_repos if k in repos}
+ seen = {""}
+ for name, prop in repos.items():
+ path = prop["path"]
+ url = ""
+ # FIXME: capture_output is new in 3.7. Maybe drop support for 3.6
+ cp = subprocess.run(
+ ["git", "remote", "-v"],
+ cwd=path,
+ universal_newlines=True,
+ capture_output=True,
+ )
+ lines = cp.stdout.split("\n")
+ if cp.returncode == 0 and len(lines) > 0:
+ parts = lines[0].split()
+ if len(parts) > 1:
+ url = parts[1]
+ if url not in seen:
+ seen.add(url)
+ print(f"{url},{name},{path}")
def f_ll(args: argparse.Namespace):
@@ -45,62 +219,128 @@ def f_ll(args: argparse.Namespace):
Display details of all repos
"""
repos = utils.get_repos()
+ ctx = utils.get_context()
+ if args.group is None and ctx:
+ args.group = ctx.stem
+ group_repos = None
if args.group: # only display repos in this group
- group_repos = utils.get_groups()[args.group]
+ group_repos = utils.get_groups()[args.group]["repos"]
repos = {k: repos[k] for k in group_repos if k in repos}
- for line in utils.describe(repos):
- print(line)
+ if args.g: # display by group
+ if group_repos:
+ print(f"{args.group}:")
+ for line in utils.describe(repos, no_colors=args.no_colors):
+ print(" ", line)
+ else:
+ for g, prop in utils.get_groups().items():
+ print(f"{g}:")
+ g_repos = {k: repos[k] for k in prop["repos"] if k in repos}
+ for line in utils.describe(g_repos, no_colors=args.no_colors):
+ print(" ", line)
+ else:
+ for line in utils.describe(repos, no_colors=args.no_colors):
+ print(line)
def f_ls(args: argparse.Namespace):
repos = utils.get_repos()
if args.repo: # one repo, show its path
- print(repos[args.repo])
+ print(repos[args.repo]["path"])
else: # show names of all repos
- print(' '.join(repos))
+ print(" ".join(repos))
def f_group(args: argparse.Namespace):
groups = utils.get_groups()
- if args.to_group:
- gname = input('group name? ')
+ cmd = args.group_cmd or "ll"
+ if cmd == "ll":
+ if "to_show" in args and args.to_show:
+ gname = args.to_show
+ print(" ".join(groups[gname]["repos"]))
+ else:
+ for group, prop in groups.items():
+ print(f"{info.Color.underline}{group}{info.Color.end}: {prop['path']}")
+ for r in prop["repos"]:
+ print(" -", r)
+ elif cmd == "ls":
+ print(" ".join(groups))
+ elif cmd == "rename":
+ new_name = args.new_name
+ gname = args.gname
+ groups[new_name] = groups[gname]
+ del groups[gname]
+ utils.write_to_groups_file(groups, "w")
+ # change context
+ ctx = utils.get_context()
+ if ctx and ctx.stem == gname:
+ utils.replace_context(ctx, new_name)
+ elif cmd == "rm":
+ ctx = utils.get_context()
+ for name in args.to_ungroup:
+ del groups[name]
+ if ctx and str(ctx.stem) == name:
+ utils.replace_context(ctx, "")
+ utils.write_to_groups_file(groups, "w")
+ elif cmd == "add":
+ gname = args.gname
if gname in groups:
- gname_repos = set(groups[gname])
+ gname_repos = set(groups[gname]["repos"])
gname_repos.update(args.to_group)
- groups[gname] = sorted(gname_repos)
- utils.write_to_groups_file(groups, 'w')
+ groups[gname]["repos"] = sorted(gname_repos)
+ if "gpath" in args:
+ groups[gname]["path"] = args.gpath
+ utils.write_to_groups_file(groups, "w")
else:
- utils.write_to_groups_file({gname: sorted(args.to_group)}, 'a+')
- else:
- for group, repos in groups.items():
- print(f"{group}: {' '.join(repos)}")
-
-
-def f_ungroup(args: argparse.Namespace):
- groups = utils.get_groups()
- to_ungroup = set(args.to_ungroup)
- to_del = []
- for name, repos in groups.items():
- remaining = set(repos) - to_ungroup
- if remaining:
- groups[name] = list(sorted(remaining))
+ gpath = ""
+ if "gpath" in args:
+ gpath = args.gpath
+ utils.write_to_groups_file(
+ {gname: {"repos": sorted(args.to_group), "path": gpath}}, "a+"
+ )
+ elif cmd == "rmrepo":
+ gname = args.gname
+ if gname in groups:
+ group = {
+ gname: {"repos": groups[gname]["repos"], "path": groups[gname]["path"]}
+ }
+ for repo in args.to_rm:
+ utils.delete_repo_from_groups(repo, group)
+ groups[gname] = group[gname]
+ utils.write_to_groups_file(groups, "w")
+
+
+def f_context(args: argparse.Namespace):
+ choice = args.choice
+ ctx = utils.get_context()
+ if choice is None: # display current context
+ if ctx:
+ group = ctx.stem
+ print(f"{group}: {' '.join(utils.get_groups()[group]['repos'])}")
+ elif (Path(common.get_config_dir()) / "auto.context").exists():
+ print("auto: none detected!")
else:
- to_del.append(name)
- for name in to_del:
- del groups[name]
- utils.write_to_groups_file(groups, 'w')
+ print("Context is not set")
+ else: # set context
+ utils.replace_context(ctx, choice)
def f_rm(args: argparse.Namespace):
"""
Unregister repo(s) from gita
"""
- path_file = utils.get_config_fname('repo_path')
+ path_file = common.get_config_fname("repos.csv")
if os.path.isfile(path_file):
repos = utils.get_repos()
+ group_updated = False
+ groups = utils.get_groups()
for repo in args.repo:
del repos[repo]
- utils.write_to_repo_file(repos, 'w')
+ up = utils.delete_repo_from_groups(repo, groups)
+ group_updated = group_updated or up
+ if group_updated:
+ utils.write_to_groups_file(groups, "w")
+
+ utils.write_to_repo_file(repos, "w")
def f_git_cmd(args: argparse.Namespace):
@@ -108,32 +348,62 @@ def f_git_cmd(args: argparse.Namespace):
Delegate git command/alias defined in `args.cmd`. Asynchronous execution is
disabled for commands in the `args.async_blacklist`.
"""
- repos = utils.get_repos()
- groups = utils.get_groups()
- if args.repo: # with user specified repo(s) or group(s)
- chosen = {}
- for k in args.repo:
- if k in repos:
- chosen[k] = repos[k]
- if k in groups:
- for r in groups[k]:
- chosen[r] = repos[r]
- repos = chosen
- cmds = ['git'] + args.cmd
- if len(repos) == 1 or cmds[1] in args.async_blacklist:
- for path in repos.values():
+ if "_parsed_repos" in args:
+ repos = args._parsed_repos
+ else:
+ repos, _ = utils.parse_repos_and_rest(args.repo)
+
+ per_repo_cmds = []
+ for prop in repos.values():
+ cmds = args.cmd.copy()
+ if cmds[0] == "git" and prop["flags"]:
+ cmds[1:1] = prop["flags"]
+ per_repo_cmds.append(cmds)
+
+ # This async blacklist mechanism is broken if the git command name does
+ # not match with the gita command name.
+ if len(repos) == 1 or args.cmd[1] in args.async_blacklist:
+ for prop, cmds in zip(repos.values(), per_repo_cmds):
+ path = prop["path"]
print(path)
- subprocess.run(cmds, cwd=path)
+ subprocess.run(cmds, cwd=path, shell=args.shell)
else: # run concurrent subprocesses
# Async execution cannot deal with multiple repos' user name/password.
# Here we shut off any user input in the async execution, and re-run
# the failed ones synchronously.
errors = utils.exec_async_tasks(
- utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items())
+ utils.run_async(repo_name, prop["path"], cmds)
+ for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items())
+ )
for path in errors:
if path:
print(path)
- subprocess.run(cmds, cwd=path)
+ # FIXME: This is broken, flags are missing. But probably few
+ # people will use `gita flags`
+ subprocess.run(args.cmd, cwd=path)
+
+
+def f_shell(args):
+ """
+ Delegate shell command defined in `args.man`, which may or may not
+ contain repo names.
+ """
+ repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode)
+ if not cmds:
+ print("Missing commands")
+ sys.exit(2)
+
+ cmds = " ".join(cmds) # join the shell command into a single string
+ for name, prop in repos.items():
+ # TODO: pull this out as a function
+ got = subprocess.run(
+ cmds,
+ cwd=prop["path"],
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ print(utils.format_output(got.stdout.decode(), name))
def f_super(args):
@@ -141,59 +411,195 @@ def f_super(args):
Delegate git command/alias defined in `args.man`, which may or may not
contain repo names.
"""
- names = []
- repos = utils.get_repos()
- groups = utils.get_groups()
- for i, word in enumerate(args.man):
- if word in repos or word in groups:
- names.append(word)
- else:
- break
- args.cmd = args.man[i:]
- args.repo = names
+ repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode)
+ if not cmds:
+ print("Missing commands")
+ sys.exit(2)
+
+ args.cmd = ["git"] + cmds
+ args._parsed_repos = repos
+ args.shell = False
f_git_cmd(args)
+def f_clear(_):
+ utils.write_to_groups_file({}, "w")
+ utils.write_to_repo_file({}, "w")
+
+
def main(argv=None):
- p = argparse.ArgumentParser(prog='gita',
- formatter_class=argparse.RawTextHelpFormatter,
- description=__doc__)
- subparsers = p.add_subparsers(title='sub-commands',
- help='additional help with sub-command -h')
-
- version = pkg_resources.require('gita')[0].version
- p.add_argument('-v',
- '--version',
- action='version',
- version=f'%(prog)s {version}')
+ p = argparse.ArgumentParser(
+ prog="gita", formatter_class=argparse.RawTextHelpFormatter, description=__doc__
+ )
+ subparsers = p.add_subparsers(
+ title="sub-commands", help="additional help with sub-command -h"
+ )
+
+ version = pkg_resources.require("gita")[0].version
+ p.add_argument("-v", "--version", action="version", version=f"%(prog)s {version}")
# bookkeeping sub-commands
- p_add = subparsers.add_parser('add', help='add repo(s)')
- p_add.add_argument('paths', nargs='+', help="add repo(s)")
+ p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)")
+ p_add.add_argument("paths", nargs="+", type=_path_name, help="repo(s) to add")
+ p_add.add_argument("-n", "--dry-run", action="store_true", help="dry run")
+ p_add.add_argument(
+ "-g",
+ "--group",
+ choices=utils.get_groups(),
+ help="add repo(s) to the specified group",
+ )
+ p_add.add_argument(
+ "-s", "--skip-submodule", action="store_true", help="skip submodule repo(s)"
+ )
+ xgroup = p_add.add_mutually_exclusive_group()
+ xgroup.add_argument(
+ "-r",
+ "--recursive",
+ action="store_true",
+ help="recursively add repo(s) in the given path(s).",
+ )
+ xgroup.add_argument(
+ "-a",
+ "--auto-group",
+ action="store_true",
+ help="recursively add repo(s) in the given path(s) "
+ "and create hierarchical groups based on folder structure.",
+ )
+ xgroup.add_argument("-b", "--bare", action="store_true", help="add bare repo(s)")
p_add.set_defaults(func=f_add)
- p_rm = subparsers.add_parser('rm', help='remove repo(s)')
- p_rm.add_argument('repo',
- nargs='+',
- choices=utils.get_repos(),
- help="remove the chosen repo(s)")
+ p_rm = subparsers.add_parser(
+ "rm", description="remove repo(s)", help="remove repo(s)"
+ )
+ p_rm.add_argument(
+ "repo", nargs="+", choices=utils.get_repos(), help="remove the chosen repo(s)"
+ )
p_rm.set_defaults(func=f_rm)
- p_rename = subparsers.add_parser('rename', help='rename a repo')
- p_rename.add_argument(
- 'repo',
- nargs=1,
- choices=utils.get_repos(),
- help="rename the chosen repo")
+ p_freeze = subparsers.add_parser(
+ "freeze",
+ description="print all repo information",
+ help="print all repo information",
+ )
+ p_freeze.add_argument(
+ "-g",
+ "--group",
+ choices=utils.get_groups(),
+ help="freeze repos in the specified group",
+ )
+ p_freeze.set_defaults(func=f_freeze)
+
+ p_clone = subparsers.add_parser(
+ "clone", description="clone repos", help="clone repos"
+ )
+ p_clone.add_argument(
+ "clonee",
+ help="A URL or a config file.",
+ )
+ p_clone.add_argument(
+ "-C",
+ "--directory",
+ help="Change to DIRECTORY before doing anything.",
+ )
+ p_clone.add_argument(
+ "-p",
+ "--preserve-path",
+ dest="preserve_path",
+ action="store_true",
+ help="clone repo(s) in their original paths",
+ )
+ p_clone.add_argument(
+ "-n",
+ "--dry-run",
+ action="store_true",
+ help="If set, show command without execution",
+ )
+ xgroup = p_clone.add_mutually_exclusive_group()
+ xgroup.add_argument(
+ "-g",
+ "--group",
+ choices=utils.get_groups(),
+ help="If set, add repo to the specified group after cloning, otherwise add to gita without group.",
+ )
+ xgroup.add_argument(
+ "-f",
+ "--from-file",
+ action="store_true",
+ help="If set, clone repos in a config file rendered from `gita freeze`",
+ )
+ p_clone.set_defaults(func=f_clone)
+
+ p_rename = subparsers.add_parser(
+ "rename", description="rename a repo", help="rename a repo"
+ )
p_rename.add_argument(
- 'new_name',
- help="new name")
+ "repo", nargs=1, choices=utils.get_repos(), help="rename the chosen repo"
+ )
+ p_rename.add_argument("new_name", help="new name")
p_rename.set_defaults(func=f_rename)
- p_info = subparsers.add_parser('info', help='show information items of the ll sub-command')
+ p_flags = subparsers.add_parser(
+ "flags",
+ description="Set custom git flags for repo.",
+ help="git flags configuration",
+ )
+ p_flags.set_defaults(func=f_flags)
+ flags_cmds = p_flags.add_subparsers(
+ dest="flags_cmd", help="additional help with sub-command -h"
+ )
+ flags_cmds.add_parser("ll", description="display repos with custom flags")
+ pf_set = flags_cmds.add_parser("set", description="Set flags for repo.")
+ pf_set.add_argument("repo", choices=utils.get_repos(), help="repo name")
+ pf_set.add_argument(
+ "flags", nargs=argparse.REMAINDER, help="custom flags, use quotes"
+ )
+
+ p_color = subparsers.add_parser(
+ "color",
+ description="display and modify branch coloring of the ll sub-command.",
+ help="color configuration",
+ )
+ p_color.set_defaults(func=f_color)
+ color_cmds = p_color.add_subparsers(
+ dest="color_cmd", help="additional help with sub-command -h"
+ )
+ color_cmds.add_parser(
+ "ll",
+ description="display available colors and the current branch coloring in the ll sub-command",
+ )
+ color_cmds.add_parser("reset", description="reset color scheme.")
+ pc_set = color_cmds.add_parser(
+ "set", description="Set color for local/remote situation."
+ )
+ pc_set.add_argument(
+ "situation",
+ choices=info.get_color_encoding(),
+ help="5 possible local/remote situations",
+ )
+ pc_set.add_argument(
+ "color", choices=[c.name for c in info.Color], help="available colors"
+ )
+
+ p_info = subparsers.add_parser(
+ "info",
+ description="list, add, or remove information items of the ll sub-command.",
+ help="information setting",
+ )
p_info.set_defaults(func=f_info)
-
- ll_doc = f''' status symbols:
+ info_cmds = p_info.add_subparsers(
+ dest="info_cmd", help="additional help with sub-command -h"
+ )
+ info_cmds.add_parser(
+ "ll", description="show used and unused information items of the ll sub-command"
+ )
+ info_cmds.add_parser("add", description="Enable information item.").add_argument(
+ "info_item", choices=info.ALL_INFO_ITEMS, help="information item to add"
+ )
+ info_cmds.add_parser("rm", description="Disable information item.").add_argument(
+ "info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete"
+ )
+
+ ll_doc = f""" status symbols:
+: staged changes
*: unstaged changes
_: untracked files/folders
@@ -203,86 +609,213 @@ def main(argv=None):
{info.Color.green}green{info.Color.end}: local is the same as remote
{info.Color.red}red{info.Color.end}: local has diverged from remote
{info.Color.purple}purple{info.Color.end}: local is ahead of remote (good for push)
- {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)'''
- p_ll = subparsers.add_parser('ll',
- help='display summary of all repos',
- formatter_class=argparse.RawTextHelpFormatter,
- description=ll_doc)
- p_ll.add_argument('group',
- nargs='?',
- choices=utils.get_groups(),
- help="show repos in the chosen group")
+ {info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)"""
+ p_ll = subparsers.add_parser(
+ "ll",
+ help="display summary of all repos",
+ formatter_class=argparse.RawTextHelpFormatter,
+ description=ll_doc,
+ )
+ p_ll.add_argument(
+ "group",
+ nargs="?",
+ choices=utils.get_groups(),
+ help="show repos in the chosen group",
+ )
+ p_ll.add_argument(
+ "-C",
+ "--no-colors",
+ action="store_true",
+ help="Disable coloring on the branch names.",
+ )
+ p_ll.add_argument("-g", action="store_true", help="Show repo summaries by group.")
p_ll.set_defaults(func=f_ll)
+ p_context = subparsers.add_parser(
+ "context",
+ help="set context",
+ description="Set and remove context. A context is a group."
+ " When set, all operations apply only to repos in that group.",
+ )
+ p_context.add_argument(
+ "choice",
+ nargs="?",
+ choices=set().union(utils.get_groups(), {"none", "auto"}),
+ help="Without this argument, show current context. "
+ "Otherwise choose a group as context, or use 'auto', "
+ "which sets the context/group automatically based on "
+ "the current working directory. "
+ "To remove context, use 'none'. ",
+ )
+ p_context.set_defaults(func=f_context)
+
p_ls = subparsers.add_parser(
- 'ls', help='display names of all repos, or path of a chosen repo')
- p_ls.add_argument('repo',
- nargs='?',
- choices=utils.get_repos(),
- help="show path of the chosen repo")
+ "ls",
+ help="show repo(s) or repo path",
+ description="display names of all repos, or path of a chosen repo",
+ )
+ p_ls.add_argument(
+ "repo",
+ nargs="?",
+ choices=utils.get_repos(),
+ help="show path of the chosen repo",
+ )
p_ls.set_defaults(func=f_ls)
p_group = subparsers.add_parser(
- 'group', help='group repos or display names of all groups if no repo is provided')
- p_group.add_argument('to_group',
- nargs='*',
- choices=utils.get_choices(),
- help="repo(s) to be grouped")
+ "group", description="list, add, or remove repo group(s)", help="group repos"
+ )
p_group.set_defaults(func=f_group)
-
- p_ungroup = subparsers.add_parser(
- 'ungroup', help='remove group information for repos',
- description="Remove group information on repos")
- p_ungroup.add_argument('to_ungroup',
- nargs='+',
- choices=utils.get_repos(),
- help="repo(s) to be ungrouped")
- p_ungroup.set_defaults(func=f_ungroup)
+ group_cmds = p_group.add_subparsers(
+ dest="group_cmd", help="additional help with sub-command -h"
+ )
+ pg_ll = group_cmds.add_parser("ll", description="List all groups with repos.")
+ pg_ll.add_argument(
+ "to_show", nargs="?", choices=utils.get_groups(), help="group to show"
+ )
+ group_cmds.add_parser("ls", description="List all group names.")
+ pg_add = group_cmds.add_parser("add", description="Add repo(s) to a group.")
+ pg_add.add_argument(
+ "to_group",
+ nargs="+",
+ metavar="repo",
+ choices=utils.get_repos(),
+ help="repo(s) to be grouped",
+ )
+ pg_add.add_argument(
+ "-n",
+ "--name",
+ dest="gname",
+ type=partial(_group_name, exclude_old_names=False),
+ metavar="group-name",
+ required=True,
+ )
+ pg_add.add_argument(
+ "-p", "--path", dest="gpath", type=_path_name, metavar="group-path"
+ )
+
+ pg_rmrepo = group_cmds.add_parser(
+ "rmrepo", description="remove repo(s) from a group."
+ )
+ pg_rmrepo.add_argument(
+ "to_rm",
+ nargs="+",
+ metavar="repo",
+ choices=utils.get_repos(),
+ help="repo(s) to be removed from the group",
+ )
+ pg_rmrepo.add_argument(
+ "-n",
+ "--name",
+ dest="gname",
+ metavar="group-name",
+ required=True,
+ help="group name",
+ )
+ pg_rename = group_cmds.add_parser("rename", description="Change group name.")
+ pg_rename.add_argument(
+ "gname",
+ metavar="group-name",
+ choices=utils.get_groups(),
+ help="existing group to rename",
+ )
+ pg_rename.add_argument(
+ "new_name", metavar="new-name", type=_group_name, help="new group name"
+ )
+ group_cmds.add_parser("rm", description="Remove group(s).").add_argument(
+ "to_ungroup", nargs="+", choices=utils.get_groups(), help="group(s) to delete"
+ )
# superman mode
p_super = subparsers.add_parser(
- 'super',
- help='superman mode: delegate any git command/alias in specified or '
- 'all repo(s).\n'
+ "super",
+ help="run any git command/alias",
+ description="Superman mode: delegate any git command/alias in specified repo(s), group(s), or "
+ "all repo(s).\n"
'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n'
- '\t gita super repo1 repo2 repo3 checkout new-feature')
+ "\t gita super repo1 repo2 repo3 checkout new-feature",
+ )
p_super.add_argument(
- 'man',
+ "man",
nargs=argparse.REMAINDER,
- help="execute arbitrary git command/alias for specified or all repos "
- "Example: gita super myrepo1 diff --name-only --staged "
- "Another: gita super checkout master ")
+ help="execute arbitrary git command/alias for specified repo(s), group(s), or all repos.\n"
+ "Example: gita super repo1 diff --name-only --staged;\n"
+ "gita super checkout master ",
+ )
+ p_super.add_argument(
+ "-q", "--quote-mode", action="store_true", help="use quote mode"
+ )
p_super.set_defaults(func=f_super)
+ # shell mode
+ p_shell = subparsers.add_parser(
+ "shell",
+ help="run any shell command",
+ description="shell mode: delegate any shell command in specified repo(s), group(s), or "
+ "all repo(s).\n"
+ "Examples:\n \t gita shell pwd; \n"
+ "\t gita shell repo1 repo2 repo3 touch xx",
+ )
+ p_shell.add_argument(
+ "man",
+ nargs=argparse.REMAINDER,
+ help="execute arbitrary shell command for specified repo(s), group(s), or all repos.\n"
+ "Example: gita shell myrepo1 ls\n"
+ "Another: gita shell git checkout master ",
+ )
+ p_shell.add_argument(
+ "-q", "--quote-mode", action="store_true", help="use quote mode"
+ )
+ p_shell.set_defaults(func=f_shell)
+
+ # clear
+ p_clear = subparsers.add_parser(
+ "clear",
+ description="removes all groups and repositories",
+ help="removes all groups and repositories",
+ )
+ p_clear.set_defaults(func=f_clear)
+
# sub-commands that fit boilerplate
cmds = utils.get_cmds_from_files()
for name, data in cmds.items():
- help = data.get('help')
- cmd = data.get('cmd') or name
- if data.get('allow_all'):
+ help = data.get("help")
+ cmd = data["cmd"]
+ if data.get("allow_all"):
choices = utils.get_choices()
- nargs = '*'
- help += ' for all repos or'
+ nargs = "*"
+ help += " for all repos or"
else:
choices = utils.get_repos().keys() | utils.get_groups().keys()
- nargs = '+'
- help += ' for the chosen repo(s) or group(s)'
- sp = subparsers.add_parser(name, help=help)
- sp.add_argument('repo', nargs=nargs, choices=choices, help=help)
- sp.set_defaults(func=f_git_cmd, cmd=cmd.split())
+ nargs = "+"
+ help += " for the chosen repo(s) or group(s)"
+ sp = subparsers.add_parser(name, description=help)
+ sp.add_argument("repo", nargs=nargs, choices=choices, help=help)
+ is_shell = bool(data.get("shell"))
+ sp.add_argument(
+ "-s",
+ "--shell",
+ default=is_shell,
+ type=bool,
+ help="If set, run in shell mode",
+ )
+ if is_shell:
+ cmd = [cmd]
+ else:
+ cmd = cmd.split()
+ sp.set_defaults(func=f_git_cmd, cmd=cmd)
args = p.parse_args(argv)
args.async_blacklist = {
- name
- for name, data in cmds.items() if data.get('disable_async')
+ name for name, data in cmds.items() if data.get("disable_async")
}
- if 'func' in args:
+ if "func" in args:
args.func(args)
else:
p.print_help() # pragma: no cover
-if __name__ == '__main__':
+if __name__ == "__main__":
main() # pragma: no cover
diff --git a/gita/cmds.json b/gita/cmds.json
new file mode 100644
index 0000000..eadda81
--- /dev/null
+++ b/gita/cmds.json
@@ -0,0 +1,90 @@
+{
+"br":{
+ "cmd": "git branch -vv",
+ "help":"show local branches"},
+"clean":{
+ "cmd": "git clean -dfx",
+ "help": "remove all untracked files/folders"},
+"diff":{
+ "cmd": "git diff",
+ "help": "git show differences"},
+"difftool":{
+ "cmd": "git difftool",
+ "disable_async": true,
+ "help": "show differences using a tool"
+ },
+"fetch":{
+ "cmd": "git fetch",
+ "allow_all": true,
+ "help": "fetch remote update"
+ },
+"last":{
+ "cmd": "git log -1 HEAD",
+ "help": "show log information of HEAD"
+ },
+"log":
+ {"cmd": "git log",
+ "disable_async": true,
+ "help": "show logs"
+ },
+"merge":{
+ "cmd": "git merge @{u}",
+ "help": "merge remote updates"
+ },
+"mergetool":{
+ "cmd": "git mergetool",
+ "disable_async": true,
+ "help": "merge updates with a tool"
+ },
+"patch":{
+ "cmd": "git format-patch HEAD~",
+ "help": "make a patch"
+ },
+"pull":{
+ "cmd": "git pull",
+ "allow_all": true,
+ "help": "pull remote updates"
+ },
+"push":{
+ "cmd": "git push",
+ "allow_all": true,
+ "help": "push the local updates"
+ },
+"rebase":{
+ "cmd": "git rebase",
+ "help": "rebase from master"
+ },
+"reflog":{
+ "cmd": "git reflog",
+ "help": "show ref logs"
+ },
+"remote":{
+ "cmd": "git remote -v",
+ "help": "show remote settings"
+ },
+"reset":{
+ "cmd": "git reset",
+ "help": "reset repo(s)"
+ },
+"show":{
+ "cmd": "git show",
+ "disable_async": true,
+ "help": "show detailed commit information"
+ },
+"stash":{
+ "cmd": "git stash",
+ "help": "store uncommited changes"
+ },
+"stat":{
+ "cmd": "git diff --stat",
+ "help": "show edit statistics"
+ },
+"st":{
+ "cmd": "git status",
+ "help": "show status"
+ },
+"tag":{
+ "cmd": "git tag -n",
+ "help": "show tags"
+ }
+}
diff --git a/gita/cmds.yml b/gita/cmds.yml
deleted file mode 100644
index 8db932e..0000000
--- a/gita/cmds.yml
+++ /dev/null
@@ -1,65 +0,0 @@
-br:
- cmd: branch -vv
- help: show local branches
-clean:
- cmd: clean -dfx
- help: remove all untracked files/folders
-diff:
- help: show differences
-difftool:
- disable_async: true
- help: show differences using a tool
-fetch:
- allow_all: true
- help: fetch remote update
-last:
- cmd: log -1 HEAD
- help: show log information of HEAD
-log:
- disable_async: true
- help: show logs
-merge:
- cmd: merge @{u}
- help: merge remote updates
-mergetool:
- disable_async: true
- help: merge updates with a tool
-patch:
- cmd: format-patch HEAD~
- help: make a patch
-pull:
- allow_all: true
- help: pull remote updates
-push:
- help: push the local updates
-rebase:
- help: rebase from master
-reflog:
- help: show ref logs
-remote:
- cmd: remote -v
- help: show remote settings
-reset:
- help: reset repo(s)
-shortlog:
- disable_async: true
- help: show short log
-show:
- disable_async: true
- help: show detailed commit information
-show-branch:
- disable_async: true
- help: show detailed branch information
-stash:
- help: store uncommited changes
-stat:
- cmd: diff --stat
- help: show edit statistics
-st:
- help: show status
-tag:
- cmd: tag -n
- help: show tags
-whatchanged:
- disable_async: true
- help: show detailed log
diff --git a/gita/common.py b/gita/common.py
index 0a271fc..64116af 100644
--- a/gita/common.py
+++ b/gita/common.py
@@ -2,7 +2,16 @@ import os
def get_config_dir() -> str:
- parent = os.environ.get('XDG_CONFIG_HOME') or os.path.join(
- os.path.expanduser('~'), '.config')
- root = os.path.join(parent, "gita")
- return root
+ root = (
+ os.environ.get("GITA_PROJECT_HOME")
+ or os.environ.get("XDG_CONFIG_HOME")
+ or os.path.join(os.path.expanduser("~"), ".config")
+ )
+ return os.path.join(root, "gita")
+
+
+def get_config_fname(fname: str) -> str:
+ """
+ Return the file name that stores the repo locations.
+ """
+ return os.path.join(get_config_dir(), fname)
diff --git a/gita/info.py b/gita/info.py
index 18d20fd..10d8bea 100644
--- a/gita/info.py
+++ b/gita/info.py
@@ -1,146 +1,275 @@
-import os
-import sys
-import yaml
+import csv
import subprocess
+from enum import Enum
+from pathlib import Path
+from collections import namedtuple
+from functools import lru_cache, partial
from typing import Tuple, List, Callable, Dict
+
from . import common
-class Color:
+class Color(Enum):
"""
Terminal color
"""
- red = '\x1b[31m' # local diverges from remote
- green = '\x1b[32m' # local == remote
- yellow = '\x1b[33m' # local is behind
- blue = '\x1b[34m'
- purple = '\x1b[35m' # local is ahead
- cyan = '\x1b[36m'
- white = '\x1b[37m' # no remote branch
- end = '\x1b[0m'
+ black = "\x1b[30m"
+ red = "\x1b[31m" # local diverges from remote
+ green = "\x1b[32m" # local == remote
+ yellow = "\x1b[33m" # local is behind
+ blue = "\x1b[34m"
+ purple = "\x1b[35m" # local is ahead
+ cyan = "\x1b[36m"
+ white = "\x1b[37m" # no remote branch
+ end = "\x1b[0m"
+ b_black = "\x1b[30;1m"
+ b_red = "\x1b[31;1m"
+ b_green = "\x1b[32;1m"
+ b_yellow = "\x1b[33;1m"
+ b_blue = "\x1b[34;1m"
+ b_purple = "\x1b[35;1m"
+ b_cyan = "\x1b[36;1m"
+ b_white = "\x1b[37;1m"
+ underline = "\x1B[4m"
+
+ # Make f"{Color.foo}" expand to Color.foo.value .
+ #
+ # See https://stackoverflow.com/a/24487545
+ def __str__(self):
+ return f"{self.value}"
+
+
+default_colors = {
+ "no_remote": Color.white.name,
+ "in_sync": Color.green.name,
+ "diverged": Color.red.name,
+ "local_ahead": Color.purple.name,
+ "remote_ahead": Color.yellow.name,
+}
+
+
+def show_colors(): # pragma: no cover
+ """ """
+ for i, c in enumerate(Color, start=1):
+ if c != Color.end and c != Color.underline:
+ print(f"{c.value}{c.name:<8} ", end="")
+ if i % 9 == 0:
+ print()
+ print(f"{Color.end}")
+ for situation, c in sorted(get_color_encoding().items()):
+ print(f"{situation:<12}: {Color[c].value}{c:<8}{Color.end} ")
-def get_info_funcs() -> List[Callable[[str], str]]:
+
+@lru_cache()
+def get_color_encoding() -> Dict[str, str]:
+ """
+ Return color scheme for different local/remote situations.
+ In the format of {situation: color name}
+ """
+ # custom settings
+ csv_config = Path(common.get_config_fname("color.csv"))
+ if csv_config.is_file():
+ with open(csv_config, "r") as f:
+ reader = csv.DictReader(f)
+ colors = next(reader)
+ else:
+ colors = default_colors
+ return colors
+
+
+def get_info_funcs(no_colors=False) -> List[Callable[[str], str]]:
"""
Return the functions to generate `gita ll` information. All these functions
take the repo path as input and return the corresponding information as str.
See `get_path`, `get_repo_status`, `get_common_commit` for examples.
"""
- info_items, to_display = get_info_items()
- return [info_items[k] for k in to_display]
+ to_display = get_info_items()
+ # This re-definition is to make unit test mocking to work
+ all_info_items = {
+ "branch": partial(get_repo_status, no_colors=no_colors),
+ "branch_name": get_repo_branch,
+ "commit_msg": get_commit_msg,
+ "commit_time": get_commit_time,
+ "path": get_path,
+ }
+ return [all_info_items[k] for k in to_display]
-def get_info_items() -> Tuple[Dict[str, Callable[[str], str]], List[str]]:
+def get_info_items() -> List[str]:
"""
- Return the available information items for display in the `gita ll`
- sub-command, and the ones to be displayed.
- It loads custom information functions and configuration if they exist.
+ Return the information items to be displayed in the `gita ll` command.
"""
- # default settings
- info_items = {'branch': get_repo_status,
- 'commit_msg': get_commit_msg,
- 'path': get_path, }
- display_items = ['branch', 'commit_msg']
-
# custom settings
- root = common.get_config_dir()
- src_fname = os.path.join(root, 'extra_repo_info.py')
- yml_fname = os.path.join(root, 'info.yml')
- if os.path.isfile(src_fname):
- sys.path.append(root)
- from extra_repo_info import extra_info_items
- info_items.update(extra_info_items)
- if os.path.isfile(yml_fname):
- with open(yml_fname, 'r') as stream:
- display_items = yaml.load(stream, Loader=yaml.FullLoader)
- display_items = [x for x in display_items if x in info_items]
- return info_items, display_items
+ csv_config = Path(common.get_config_fname("info.csv"))
+ if csv_config.is_file():
+ with open(csv_config, "r") as f:
+ reader = csv.reader(f)
+ display_items = next(reader)
+ display_items = [x for x in display_items if x in ALL_INFO_ITEMS]
+ else:
+ # default settings
+ display_items = ["branch", "commit_msg", "commit_time"]
+ return display_items
-def get_path(path):
- return Color.cyan + path + Color.end
+def get_path(prop: Dict[str, str]) -> str:
+ return f'{Color.cyan}{prop["path"]}{Color.end}'
+# TODO: do we need to add the flags here too?
def get_head(path: str) -> str:
- result = subprocess.run('git rev-parse --abbrev-ref HEAD'.split(),
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL,
- universal_newlines=True,
- cwd=path)
+ result = subprocess.run(
+ "git symbolic-ref -q --short HEAD || git describe --tags --exact-match",
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ universal_newlines=True,
+ cwd=path,
+ )
return result.stdout.strip()
-def run_quiet_diff(args: List[str]) -> bool:
+def run_quiet_diff(flags: List[str], args: List[str], path) -> int:
"""
Return the return code of git diff `args` in quiet mode
"""
result = subprocess.run(
- ['git', 'diff', '--quiet'] + args,
+ ["git"] + flags + ["diff", "--quiet"] + args,
stderr=subprocess.DEVNULL,
+ cwd=path,
)
return result.returncode
-def get_common_commit() -> str:
+def get_common_commit(path) -> str:
"""
Return the hash of the common commit of the local and upstream branches.
"""
- result = subprocess.run('git merge-base @{0} @{u}'.split(),
- stdout=subprocess.PIPE,
- universal_newlines=True)
+ result = subprocess.run(
+ "git merge-base @{0} @{u}".split(),
+ stdout=subprocess.PIPE,
+ universal_newlines=True,
+ cwd=path,
+ )
return result.stdout.strip()
-def has_untracked() -> bool:
+def has_untracked(flags: List[str], path) -> bool:
"""
Return True if untracked file/folder exists
"""
- result = subprocess.run('git ls-files -zo --exclude-standard'.split(),
- stdout=subprocess.PIPE)
+ cmd = ["git"] + flags + "ls-files -zo --exclude-standard".split()
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, cwd=path)
return bool(result.stdout)
-def get_commit_msg(path: str) -> str:
+def get_commit_msg(prop: Dict[str, str]) -> str:
"""
Return the last commit message.
"""
# `git show-branch --no-name HEAD` is faster than `git show -s --format=%s`
- result = subprocess.run('git show-branch --no-name HEAD'.split(),
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL,
- universal_newlines=True,
- cwd=path)
+ cmd = ["git"] + prop["flags"] + "show-branch --no-name HEAD".split()
+ result = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ universal_newlines=True,
+ cwd=prop["path"],
+ )
return result.stdout.strip()
-def get_repo_status(path: str) -> str:
- head = get_head(path)
- dirty, staged, untracked, color = _get_repo_status(path)
- return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}'
+def get_commit_time(prop: Dict[str, str]) -> str:
+ """
+ Return the last commit time in parenthesis.
+ """
+ cmd = ["git"] + prop["flags"] + "log -1 --format=%cd --date=relative".split()
+ result = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ universal_newlines=True,
+ cwd=prop["path"],
+ )
+ return f"({result.stdout.strip()})"
+
+
+default_symbols = {
+ "dirty": "*",
+ "staged": "+",
+ "untracked": "?",
+ "local_ahead": "↑",
+ "remote_ahead": "↓",
+ "diverged": "⇕",
+ "in_sync": "",
+ "no_remote": "∅",
+ "": "",
+}
-def _get_repo_status(path: str) -> Tuple[str]:
+@lru_cache()
+def get_symbols() -> Dict[str, str]:
+ """
+ return status symbols with customization
+ """
+ custom = {}
+ csv_config = Path(common.get_config_fname("symbols.csv"))
+ if csv_config.is_file():
+ with open(csv_config, "r") as f:
+ reader = csv.DictReader(f)
+ custom = next(reader)
+ default_symbols.update(custom)
+ return default_symbols
+
+
+def get_repo_status(prop: Dict[str, str], no_colors=False) -> str:
+ branch = get_head(prop["path"])
+ dirty, staged, untracked, situ = _get_repo_status(prop)
+ symbols = get_symbols()
+ info = f"{branch:<10} [{symbols[dirty]+symbols[staged]+symbols[untracked]+symbols[situ]}]"
+
+ if no_colors:
+ return f"{info:<18}"
+ colors = {situ: Color[name].value for situ, name in get_color_encoding().items()}
+ color = colors[situ]
+ return f"{color}{info:<18}{Color.end}"
+
+
+def get_repo_branch(prop: Dict[str, str]) -> str:
+ return get_head(prop["path"])
+
+
+def _get_repo_status(prop: Dict[str, str]) -> Tuple[str, str, str, str]:
"""
Return the status of one repo
"""
- os.chdir(path)
- dirty = '*' if run_quiet_diff([]) else ''
- staged = '+' if run_quiet_diff(['--cached']) else ''
- untracked = '_' if has_untracked() else ''
-
- diff_returncode = run_quiet_diff(['@{u}', '@{0}'])
- has_no_remote = diff_returncode == 128
- has_no_diff = diff_returncode == 0
- if has_no_remote:
- color = Color.white
- elif has_no_diff:
- color = Color.green
+ path = prop["path"]
+ flags = prop["flags"]
+ dirty = "dirty" if run_quiet_diff(flags, [], path) else ""
+ staged = "staged" if run_quiet_diff(flags, ["--cached"], path) else ""
+ untracked = "untracked" if has_untracked(flags, path) else ""
+
+ diff_returncode = run_quiet_diff(flags, ["@{u}", "@{0}"], path)
+ if diff_returncode == 128:
+ situ = "no_remote"
+ elif diff_returncode == 0:
+ situ = "in_sync"
else:
- common_commit = get_common_commit()
- outdated = run_quiet_diff(['@{u}', common_commit])
+ common_commit = get_common_commit(path)
+ outdated = run_quiet_diff(flags, ["@{u}", common_commit], path)
if outdated:
- diverged = run_quiet_diff(['@{0}', common_commit])
- color = Color.red if diverged else Color.yellow
+ diverged = run_quiet_diff(flags, ["@{0}", common_commit], path)
+ situ = "diverged" if diverged else "remote_ahead"
else: # local is ahead of remote
- color = Color.purple
- return dirty, staged, untracked, color
+ situ = "local_ahead"
+ return dirty, staged, untracked, situ
+
+
+ALL_INFO_ITEMS = {
+ "branch",
+ "branch_name",
+ "commit_msg",
+ "commit_time",
+ "path",
+}
diff --git a/gita/utils.py b/gita/utils.py
index d14484a..6746d7f 100644
--- a/gita/utils.py
+++ b/gita/utils.py
@@ -1,61 +1,160 @@
+import sys
import os
-import yaml
+import json
+import csv
import asyncio
import platform
+import subprocess
from functools import lru_cache
-from typing import List, Dict, Coroutine, Union
+from pathlib import Path
+from typing import List, Dict, Coroutine, Union, Iterator, Tuple
+from collections import Counter, defaultdict
+from concurrent.futures import ThreadPoolExecutor
+import multiprocessing
from . import info
from . import common
-def get_config_fname(fname: str) -> str:
+MAX_INT = sys.maxsize
+
+
+def get_relative_path(kid: os.PathLike, parent: str) -> Union[List[str], None]:
"""
- Return the file name that stores the repo locations.
+ Return the relative path depth if relative, otherwise None.
+
+ Both the `kid` and `parent` should be absolute paths
"""
- root = common.get_config_dir()
- return os.path.join(root, fname)
+ if parent == "":
+ return None
+
+ p_kid = Path(kid)
+ # p_kid = Path(kid).resolve()
+ try:
+ p_rel = p_kid.relative_to(parent)
+ except ValueError:
+ return None
+ rel = str(p_rel).split(os.sep)
+ if rel == ["."]:
+ rel = []
+ return rel
@lru_cache()
-def get_repos() -> Dict[str, str]:
+def get_repos() -> Dict[str, Dict[str, str]]:
"""
- Return a `dict` of repo name to repo absolute path
+ Return a `dict` of repo name to repo absolute path and repo type
+
"""
- path_file = get_config_fname('repo_path')
+ path_file = common.get_config_fname("repos.csv")
repos = {}
- # Each line is a repo path and repo name separated by ,
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
with open(path_file) as f:
- for line in f:
- line = line.rstrip()
- if not line: # blank line
- continue
- path, name = line.split(',')
- if not is_git(path):
- continue
- if name not in repos:
- repos[name] = path
- else: # repo name collision for different paths: include parent path name
- par_name = os.path.basename(os.path.dirname(path))
- repos[os.path.join(par_name, name)] = path
+ rows = csv.DictReader(
+ f, ["path", "name", "type", "flags"], restval=""
+ ) # it's actually a reader
+ repos = {
+ r["name"]: {
+ "path": r["path"],
+ "type": r["type"],
+ "flags": r["flags"].split(),
+ }
+ for r in rows
+ if is_git(r["path"], include_bare=True)
+ }
return repos
@lru_cache()
-def get_groups() -> Dict[str, List[str]]:
+def get_context() -> Union[Path, None]:
+ """
+ Return context file path, or None if not set. Note that if in auto context
+ mode, the return value is not auto.context but the resolved context,
+ which could be None.
+
+ """
+ config_dir = Path(common.get_config_dir())
+ matches = list(config_dir.glob("*.context"))
+ if len(matches) > 1:
+ print("Cannot have multiple .context file")
+ sys.exit(1)
+ if not matches:
+ return None
+ ctx = matches[0]
+ if ctx.stem == "auto":
+ # The context is set to be the group with minimal distance to cwd
+ candidate = None
+ min_dist = MAX_INT
+ for gname, prop in get_groups().items():
+ rel = get_relative_path(Path.cwd(), prop["path"])
+ if rel is None:
+ continue
+ d = len(rel)
+ if d < min_dist:
+ candidate = gname
+ min_dist = d
+ if not candidate:
+ ctx = None
+ else:
+ ctx = ctx.with_name(f"{candidate}.context")
+ return ctx
+
+
+@lru_cache()
+def get_groups() -> Dict[str, Dict[str, Union[str, List]]]:
"""
- Return a `dict` of group name to repo names.
+ Return a `dict` of group name to group properties such as repo names and
+ group path.
"""
- fname = get_config_fname('groups.yml')
+ fname = common.get_config_fname("groups.csv")
groups = {}
- # Each line is a repo path and repo name separated by ,
+ repos = get_repos()
+ # Each line is: group-name:repo1 repo2 repo3:group-path
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
- with open(fname, 'r') as f:
- groups = yaml.load(f, Loader=yaml.FullLoader)
+ with open(fname, "r") as f:
+ rows = csv.DictReader(
+ f, ["name", "repos", "path"], restval="", delimiter=":"
+ )
+ # filter out invalid repos
+ groups = {
+ r["name"]: {
+ "repos": [repo for repo in r["repos"].split() if repo in repos],
+ "path": r["path"],
+ }
+ for r in rows
+ }
return groups
+def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool:
+ """
+ Delete repo from groups
+ """
+ deleted = False
+ for name in groups:
+ try:
+ groups[name]["repos"].remove(repo)
+ except ValueError as e:
+ pass
+ else:
+ deleted = True
+ return deleted
+
+
+def replace_context(old: Union[Path, None], new: str):
+ """ """
+ auto = Path(common.get_config_dir()) / "auto.context"
+ if auto.exists():
+ old = auto
+
+ if new == "none": # delete
+ old and old.unlink()
+ elif old:
+ # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
+ old.rename(old.with_name(f"{new}.context"))
+ else:
+ Path(auto.with_name(f"{new}.context")).write_text("")
+
def get_choices() -> List[Union[str, None]]:
"""
@@ -72,67 +171,209 @@ def get_choices() -> List[Union[str, None]]:
return choices
-def is_git(path: str) -> bool:
+def is_submodule_repo(p: Path) -> bool:
+ """ """
+ if p.is_file() and ".git/modules" in p.read_text():
+ return True
+ return False
+
+
+def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool:
"""
Return True if the path is a git repo.
"""
+ if not os.path.exists(path):
+ return False
# An alternative is to call `git rev-parse --is-inside-work-tree`
# I don't see why that one is better yet.
- # For a regular git repo, .git is a folder, for a worktree repo, .git is a file.
- # However, git submodule repo also has .git as a file.
+ # For a regular git repo, .git is a folder. For a worktree repo and
+ # submodule repo, .git is a file.
# A more reliable way to differentiable regular and worktree repos is to
# compare the result of `git rev-parse --git-dir` and
# `git rev-parse --git-common-dir`
- loc = os.path.join(path, '.git')
+ loc = os.path.join(path, ".git")
# TODO: we can display the worktree repos in a different font.
- return os.path.exists(loc)
-
-
-def rename_repo(repos: Dict[str, str], repo: str, new_name: str):
+ if os.path.exists(loc):
+ if exclude_submodule and is_submodule_repo(Path(loc)):
+ return False
+ return True
+ if not include_bare:
+ return False
+ # detect bare repo
+ got = subprocess.run(
+ "git rev-parse --is-bare-repository".split(),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ cwd=path,
+ )
+ if got.returncode == 0 and got.stdout == b"true\n":
+ return True
+ return False
+
+
+def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
"""
Write new repo name to file
"""
- path = repos[repo]
+ if new_name in repos:
+ print(f"{new_name} is already in use!")
+ return
+ prop = repos[repo]
del repos[repo]
- repos[new_name] = path
- write_to_repo_file(repos, 'w')
+ repos[new_name] = prop
+ write_to_repo_file(repos, "w")
+ groups = get_groups()
+ for g, values in groups.items():
+ members = values["repos"]
+ if repo in members:
+ members.remove(repo)
+ members.append(new_name)
+ groups[g]["repos"] = sorted(members)
+ write_to_groups_file(groups, "w")
-def write_to_repo_file(repos: Dict[str, str], mode: str):
+
+def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str):
"""
+ @param repos: each repo is {name: {properties}}
"""
- data = ''.join(f'{path},{name}\n' for name, path in repos.items())
- fname = get_config_fname('repo_path')
+ # The 3rd column is repo type; unused field
+ data = [
+ (prop["path"], name, "", " ".join(prop["flags"]))
+ for name, prop in repos.items()
+ ]
+ fname = common.get_config_fname("repos.csv")
os.makedirs(os.path.dirname(fname), exist_ok=True)
- with open(fname, mode) as f:
- f.write(data)
+ with open(fname, mode, newline="") as f:
+ writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ writer.writerows(data)
-def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
+# TODO: combine with the repo writer
+def write_to_groups_file(groups: Dict[str, Dict], mode: str):
+ """ """
+ fname = common.get_config_fname("groups.csv")
+ os.makedirs(os.path.dirname(fname), exist_ok=True)
+ if not groups: # all groups are deleted
+ Path(fname).write_text("")
+ else:
+ # delete the group if there are no repos
+ for name in list(groups):
+ if not groups[name]["repos"]:
+ del groups[name]
+ with open(fname, mode, newline="") as f:
+ data = [
+ (group, " ".join(prop["repos"]), prop["path"])
+ for group, prop in groups.items()
+ ]
+ writer = csv.writer(
+ f, delimiter=":", quotechar='"', quoting=csv.QUOTE_MINIMAL
+ )
+ writer.writerows(data)
+
+
+def _make_name(
+ path: str, repos: Dict[str, Dict[str, str]], name_counts: Counter
+) -> str:
"""
+ Given a new repo `path`, create a repo name. By default, basename is used.
+ If name collision exists, further include parent path name.
+ @param path: It should not be in `repos` and is absolute
"""
- fname = get_config_fname('groups.yml')
- os.makedirs(os.path.dirname(fname), exist_ok=True)
- with open(fname, mode) as f:
- yaml.dump(groups, f, default_flow_style=None)
-
-
-def add_repos(repos: Dict[str, str], new_paths: List[str]):
+ name = os.path.basename(os.path.normpath(path))
+ if name in repos or name_counts[name] > 1:
+ # path has no trailing /
+ par_name = os.path.basename(os.path.dirname(path))
+ return os.path.join(par_name, name)
+ return name
+
+
+def add_repos(
+ repos: Dict[str, Dict[str, str]],
+ new_paths: List[str],
+ include_bare=False,
+ exclude_submodule=False,
+ dry_run=False,
+) -> Dict[str, Dict[str, str]]:
"""
- Write new repo paths to file
+ Write new repo paths to file; return the added repos.
+
+ @param repos: name -> path
"""
- existing_paths = set(repos.values())
- new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p))
+ existing_paths = {prop["path"] for prop in repos.values()}
+ new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)}
new_paths = new_paths - existing_paths
+ new_repos = {}
if new_paths:
print(f"Found {len(new_paths)} new repo(s).")
+ if dry_run:
+ for p in new_paths:
+ print(p)
+ return {}
+ name_counts = Counter(os.path.basename(os.path.normpath(p)) for p in new_paths)
new_repos = {
- os.path.basename(os.path.normpath(path)): path
- for path in new_paths}
- write_to_repo_file(new_repos, 'a+')
+ _make_name(path, repos, name_counts): {
+ "path": path,
+ "flags": "",
+ }
+ for path in new_paths
+ }
+ write_to_repo_file(new_repos, "a+")
+ else:
+ print("No new repos found!")
+ return new_repos
+
+
+def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[Tuple[str, ...], str]:
+ """
+ Return relative parent strings, and the parent head string
+
+ For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
+ then return (b, c, d)
+ """
+ for p in paths:
+ rel = get_relative_path(repo_path, p)[:-1]
+ if rel is not None:
+ break
else:
- print('No new repos found!')
+ return (), ""
+ head, tail = os.path.split(p)
+ return (tail, *rel), head
+
+
+def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, Dict]:
+ """
+
+ @params repos: repos to be grouped
+ """
+ # FIXME: the upstream code should make sure that paths are all independent
+ # i.e., each repo should be contained in one and only one path
+ new_groups = defaultdict(dict)
+ for repo_name, prop in repos.items():
+ hash, head = _generate_dir_hash(prop["path"], paths)
+ if not hash:
+ continue
+ for i in range(1, len(hash) + 1):
+ group_name = "-".join(hash[:i])
+ prop = new_groups[group_name]
+ prop["path"] = os.path.join(head, *hash[:i])
+ if "repos" not in prop:
+ prop["repos"] = [repo_name]
+ else:
+ prop["repos"].append(repo_name)
+ # FIXME: need to make sure the new group names don't clash with old ones
+ # or repo names
+ return new_groups
+
+
+def parse_clone_config(fname: str) -> Iterator[List[str]]:
+ """
+ Return the url, name, and path of all repos in `fname`.
+ """
+ with open(fname) as f:
+ for line in f:
+ yield line.strip().split(",")
async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]:
@@ -140,17 +381,19 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s
Run `cmds` asynchronously in `path` directory. Return the `path` if
execution fails.
"""
+ # TODO: deprecated since 3.8, will be removed in 3.10
process = await asyncio.create_subprocess_exec(
*cmds,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
start_new_session=True,
- cwd=path)
+ cwd=path,
+ )
stdout, stderr = await process.communicate()
for pipe in (stdout, stderr):
if pipe:
- print(format_output(pipe.decode(), f'{repo_name}: '))
+ print(format_output(pipe.decode(), repo_name))
# The existence of stderr is not good indicator since git sometimes write
# to stderr even if the execution is successful, e.g. git fetch
if process.returncode != 0:
@@ -161,7 +404,7 @@ def format_output(s: str, prefix: str):
"""
Prepends every line in given string with the given prefix.
"""
- return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)])
+ return "".join([f"{prefix}: {line}" for line in s.splitlines(keepends=True)])
def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
@@ -169,7 +412,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
Execute tasks asynchronously
"""
# TODO: asyncio API is nicer in python 3.7
- if platform.system() == 'Windows':
+ if platform.system() == "Windows":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
@@ -182,17 +425,21 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
return errors
-def describe(repos: Dict[str, str]) -> str:
+def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
"""
Return the status of all repos
"""
if repos:
- name_width = max(len(n) for n in repos) + 1
- funcs = info.get_info_funcs()
- for name in sorted(repos):
- path = repos[name]
- display_items = ' '.join(f(path) for f in funcs)
- yield f'{name:<{name_width}}{display_items}'
+ name_width = len(max(repos, key=len)) + 1
+ funcs = info.get_info_funcs(no_colors=no_colors)
+
+ num_threads = min(multiprocessing.cpu_count(), len(repos))
+ with ThreadPoolExecutor(max_workers=num_threads) as executor:
+ for line in executor.map(
+ lambda name: f'{name:<{name_width}}{" ".join(f(repos[name]) for f in funcs)}',
+ sorted(repos),
+ ):
+ yield line
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
@@ -208,18 +455,59 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
}
"""
# default config file
- fname = os.path.join(os.path.dirname(__file__), "cmds.yml")
- with open(fname, 'r') as stream:
- cmds = yaml.load(stream, Loader=yaml.FullLoader)
+ fname = os.path.join(os.path.dirname(__file__), "cmds.json")
+ with open(fname, "r") as f:
+ cmds = json.load(f)
# custom config file
root = common.get_config_dir()
- fname = os.path.join(root, 'cmds.yml')
+ fname = os.path.join(root, "cmds.json")
custom_cmds = {}
if os.path.isfile(fname) and os.path.getsize(fname):
- with open(fname, 'r') as stream:
- custom_cmds = yaml.load(stream, Loader=yaml.FullLoader)
+ with open(fname, "r") as f:
+ custom_cmds = json.load(f)
# custom commands shadow default ones
cmds.update(custom_cmds)
return cmds
+
+
+def parse_repos_and_rest(
+ input: List[str],
+ quote_mode=False,
+) -> Tuple[Dict[str, Dict[str, str]], List[str]]:
+ """
+ Parse gita input arguments
+
+ @return: repos and the rest (e.g., gita shell and super commands)
+ """
+ i = 0
+ names = []
+ repos = get_repos()
+ groups = get_groups()
+ ctx = get_context()
+ for i, word in enumerate(input):
+ if word in repos or word in groups:
+ names.append(word)
+ else:
+ break
+ else: # all input is repos and groups, shift the index once more
+ if i is not None:
+ i += 1
+ if not names and ctx:
+ names = [ctx.stem]
+ if quote_mode and i + 1 != len(input):
+ print(input[i], "is not a repo or group")
+ sys.exit(2)
+
+ if names:
+ chosen = {}
+ for k in names:
+ if k in repos:
+ chosen[k] = repos[k]
+ if k in groups:
+ for r in groups[k]["repos"]:
+ chosen[r] = repos[r]
+ # if not set here, all repos are chosen
+ repos = chosen
+ return repos, input[i:]