summaryrefslogtreecommitdiffstats
path: root/gita
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-07-17 07:26:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-07-17 07:26:43 +0000
commit8fd7f9bfed753dbaa5543747569b4c2543aff03d (patch)
tree1b8854ec7e68bb7daf7e8b7db657669d930a99be /gita
parentReleasing debian version 0.12.9-1. (diff)
downloadgita-8fd7f9bfed753dbaa5543747569b4c2543aff03d.tar.xz
gita-8fd7f9bfed753dbaa5543747569b4c2543aff03d.zip
Merging upstream version 0.15.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'gita')
-rw-r--r--gita/__main__.py235
-rw-r--r--gita/cmds.json89
-rw-r--r--gita/cmds.yml65
-rw-r--r--gita/common.py17
-rw-r--r--gita/info.py103
-rw-r--r--gita/utils.py261
6 files changed, 546 insertions, 224 deletions
diff --git a/gita/__main__.py b/gita/__main__.py
index beecab2..ee4e7e7 100644
--- a/gita/__main__.py
+++ b/gita/__main__.py
@@ -16,22 +16,52 @@ https://github.com/nosarthur/gita/blob/master/.gita-completion.bash
import os
import sys
-import yaml
+import csv
import argparse
import subprocess
import pkg_resources
from itertools import chain
from pathlib import Path
+import glob
from . import utils, info, common
+def _group_name(name: str) -> str:
+ """
+
+ """
+ repos = utils.get_repos()
+ if name in repos:
+ print(f"Cannot use group name {name} since it's a repo name.")
+ sys.exit(1)
+ return name
+
+
def f_add(args: argparse.Namespace):
repos = utils.get_repos()
paths = args.paths
- if args.recursive:
- paths = chain.from_iterable(Path(p).glob('**') for p in args.paths)
- utils.add_repos(repos, paths)
+ if args.main:
+ # add to global and tag as main
+ main_repos = utils.add_repos(repos, paths, repo_type='m')
+ # add sub-repo recursively and save to local config
+ for name, prop in main_repos.items():
+ main_path = prop['path']
+ print('Inside main repo:', name)
+ #sub_paths = Path(main_path).glob('**')
+ sub_paths = glob.glob(os.path.join(main_path,'**/'), recursive=True)
+ utils.add_repos({}, sub_paths, root=main_path)
+ else:
+ if args.recursive or args.auto_group:
+ paths = chain.from_iterable(
+ glob.glob(os.path.join(p, '**/'), recursive=True)
+ for p in args.paths)
+ new_repos = utils.add_repos(repos, paths, is_bare=args.bare)
+ if args.auto_group:
+ new_groups = utils.auto_group(new_repos, args.paths)
+ if new_groups:
+ print(f'Created {len(new_groups)} new group(s).')
+ utils.write_to_groups_file(new_groups, 'a+')
def f_rename(args: argparse.Namespace):
@@ -39,16 +69,32 @@ def f_rename(args: argparse.Namespace):
utils.rename_repo(repos, args.repo[0], args.new_name)
+def f_flags(args: argparse.Namespace):
+ cmd = args.flags_cmd or 'll'
+ repos = utils.get_repos()
+ if cmd == 'll':
+ for r, prop in repos.items():
+ if prop['flags']:
+ print(f"{r}: {prop['flags']}")
+ elif cmd == 'set':
+ # when in memory, flags are List[str], when on disk, they are space
+ # delimited str
+ repos[args.repo]['flags'] = args.flags
+ utils.write_to_repo_file(repos, 'w')
+
+
def f_color(args: argparse.Namespace):
cmd = args.color_cmd or 'll'
if cmd == 'll': # pragma: no cover
info.show_colors()
elif cmd == 'set':
colors = info.get_color_encoding()
- colors[args.situation] = info.Color[args.color].value
- yml_config = common.get_config_fname('color.yml')
- with open(yml_config, 'w') as f:
- yaml.dump(colors, f, default_flow_style=None)
+ colors[args.situation] = args.color
+ csv_config = common.get_config_fname('color.csv')
+ with open(csv_config, 'w', newline='') as f:
+ writer = csv.DictWriter(f, fieldnames=colors)
+ writer.writeheader()
+ writer.writerow(colors)
def f_info(args: argparse.Namespace):
@@ -56,37 +102,53 @@ def f_info(args: argparse.Namespace):
cmd = args.info_cmd or 'll'
if cmd == 'll':
print('In use:', ','.join(to_display))
- unused = set(info.ALL_INFO_ITEMS) - set(to_display)
+ unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display)))
if unused:
- print('Unused:', ' '.join(unused))
+ print('Unused:', ','.join(unused))
return
if cmd == 'add' and args.info_item not in to_display:
to_display.append(args.info_item)
- yml_config = common.get_config_fname('info.yml')
- with open(yml_config, 'w') as f:
- yaml.dump(to_display, f, default_flow_style=None)
+ csv_config = common.get_config_fname('info.csv')
+ with open(csv_config, 'w', newline='') as f:
+ writer = csv.writer(f)
+ writer.writerow(to_display)
elif cmd == 'rm' and args.info_item in to_display:
to_display.remove(args.info_item)
- yml_config = common.get_config_fname('info.yml')
- with open(yml_config, 'w') as f:
- yaml.dump(to_display, f, default_flow_style=None)
+ csv_config = common.get_config_fname('info.csv')
+ with open(csv_config, 'w', newline='') as f:
+ writer = csv.writer(f)
+ writer.writerow(to_display)
def f_clone(args: argparse.Namespace):
path = Path.cwd()
- errors = utils.exec_async_tasks(
+ if args.preserve_path:
+ utils.exec_async_tasks(
+ utils.run_async(repo_name, path, ['git', 'clone', url, abs_path])
+ for url, repo_name, abs_path in utils.parse_clone_config(args.fname))
+ else:
+ utils.exec_async_tasks(
utils.run_async(repo_name, path, ['git', 'clone', url])
for url, repo_name, _ in utils.parse_clone_config(args.fname))
def f_freeze(_):
repos = utils.get_repos()
- for name, path in repos.items():
+ seen = {''}
+ for name, prop in repos.items():
+ path = prop['path']
+ # TODO: What do we do with main repos? Maybe give an option to print
+ # their sub-repos too.
url = ''
cp = subprocess.run(['git', 'remote', '-v'], cwd=path, capture_output=True)
- if cp.returncode == 0:
- url = cp.stdout.decode('utf-8').split('\n')[0].split()[1]
- print(f'{url},{name},{path}')
+ lines = cp.stdout.decode('utf-8').split('\n')
+ if cp.returncode == 0 and len(lines) > 0:
+ parts = lines[0].split()
+ if len(parts)>1:
+ url = parts[1]
+ if url not in seen:
+ seen.add(url)
+ print(f'{url},{name},{path}')
def f_ll(args: argparse.Namespace):
@@ -107,7 +169,7 @@ def f_ll(args: argparse.Namespace):
def f_ls(args: argparse.Namespace):
repos = utils.get_repos()
if args.repo: # one repo, show its path
- print(repos[args.repo])
+ print(repos[args.repo]['path'])
else: # show names of all repos
print(' '.join(repos))
@@ -128,6 +190,11 @@ def f_group(args: argparse.Namespace):
groups[new_name] = groups[gname]
del groups[gname]
utils.write_to_groups_file(groups, 'w')
+ # change context
+ ctx = utils.get_context()
+ if ctx and str(ctx.stem) == gname:
+ # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
+ ctx.rename(ctx.with_name(f'{new_name}.context'))
elif cmd == 'rm':
ctx = utils.get_context()
for name in args.to_ungroup:
@@ -178,12 +245,22 @@ def f_rm(args: argparse.Namespace):
"""
Unregister repo(s) from gita
"""
- path_file = common.get_config_fname('repo_path')
+ path_file = common.get_config_fname('repos.csv')
if os.path.isfile(path_file):
repos = utils.get_repos()
+ main_paths = [prop['path'] for prop in repos.values() if prop['type'] == 'm']
+ # TODO: add test case to delete main repo from main repo
+ # only local setting should be affected instead of the global one
for repo in args.repo:
del repos[repo]
- utils.write_to_repo_file(repos, 'w')
+ # If cwd is relative to any main repo, write to local config
+ cwd = os.getcwd()
+ for p in main_paths:
+ if utils.is_relative_to(cwd, p):
+ utils.write_to_repo_file(repos, 'w', p)
+ break
+ else: # global config
+ utils.write_to_repo_file(repos, 'w')
def f_git_cmd(args: argparse.Namespace):
@@ -205,21 +282,33 @@ def f_git_cmd(args: argparse.Namespace):
for r in groups[k]:
chosen[r] = repos[r]
repos = chosen
- cmds = ['git'] + args.cmd
- if len(repos) == 1 or cmds[1] in args.async_blacklist:
- for path in repos.values():
+ per_repo_cmds = []
+ for prop in repos.values():
+ cmds = args.cmd.copy()
+ if cmds[0] == 'git' and prop['flags']:
+ cmds[1:1] = prop['flags']
+ per_repo_cmds.append(cmds)
+
+ # This async blacklist mechanism is broken if the git command name does
+ # not match with the gita command name.
+ if len(repos) == 1 or args.cmd[1] in args.async_blacklist:
+ for prop, cmds in zip(repos.values(), per_repo_cmds):
+ path = prop['path']
print(path)
- subprocess.run(cmds, cwd=path)
+ subprocess.run(cmds, cwd=path, shell=args.shell)
else: # run concurrent subprocesses
# Async execution cannot deal with multiple repos' user name/password.
# Here we shut off any user input in the async execution, and re-run
# the failed ones synchronously.
errors = utils.exec_async_tasks(
- utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items())
+ utils.run_async(repo_name, prop['path'], cmds)
+ for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items()))
for path in errors:
if path:
print(path)
- subprocess.run(cmds, cwd=path)
+ # FIXME: This is broken, flags are missing. But probably few
+ # people will use `gita flags`
+ subprocess.run(args.cmd, cwd=path)
def f_shell(args):
@@ -249,10 +338,10 @@ def f_shell(args):
for r in groups[k]:
chosen[r] = repos[r]
repos = chosen
- cmds = args.man[i:]
- for name, path in repos.items():
+ cmds = ' '.join(args.man[i:]) # join the shell command into a single string
+ for name, prop in repos.items():
# TODO: pull this out as a function
- got = subprocess.run(cmds, cwd=path, check=True,
+ got = subprocess.run(cmds, cwd=prop['path'], check=True, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
print(utils.format_output(got.stdout.decode(), name))
@@ -271,8 +360,9 @@ def f_super(args):
names.append(word)
else:
break
- args.cmd = args.man[i:]
+ args.cmd = ['git'] + args.man[i:]
args.repo = names
+ args.shell = False
f_git_cmd(args)
@@ -292,9 +382,17 @@ def main(argv=None):
# bookkeeping sub-commands
p_add = subparsers.add_parser('add', description='add repo(s)',
help='add repo(s)')
- p_add.add_argument('paths', nargs='+', help="repo(s) to add")
- p_add.add_argument('-r', dest='recursive', action='store_true',
- help="recursively add repo(s) in the given path.")
+ p_add.add_argument('paths', nargs='+', type=os.path.abspath, help="repo(s) to add")
+ xgroup = p_add.add_mutually_exclusive_group()
+ xgroup.add_argument('-r', '--recursive', action='store_true',
+ help="recursively add repo(s) in the given path(s).")
+ xgroup.add_argument('-m', '--main', action='store_true',
+ help="make main repo(s), sub-repos are recursively added.")
+ xgroup.add_argument('-a', '--auto-group', action='store_true',
+ help="recursively add repo(s) in the given path(s) "
+ "and create hierarchical groups based on folder structure.")
+ xgroup.add_argument('-b', '--bare', action='store_true',
+ help="add bare repo(s)")
p_add.set_defaults(func=f_add)
p_rm = subparsers.add_parser('rm', description='remove repo(s)',
@@ -305,15 +403,22 @@ def main(argv=None):
help="remove the chosen repo(s)")
p_rm.set_defaults(func=f_rm)
- p_freeze = subparsers.add_parser('freeze', description='print all repo information')
+ p_freeze = subparsers.add_parser('freeze',
+ description='print all repo information',
+ help='print all repo information')
p_freeze.set_defaults(func=f_freeze)
- p_clone = subparsers.add_parser('clone', description='clone repos from config file')
+ p_clone = subparsers.add_parser('clone',
+ description='clone repos from config file',
+ help='clone repos from config file')
p_clone.add_argument('fname',
help='config file. Its content should be the output of `gita freeze`.')
+ p_clone.add_argument('-p', '--preserve-path', dest='preserve_path', action='store_true',
+ help="clone repo(s) in their original paths")
p_clone.set_defaults(func=f_clone)
- p_rename = subparsers.add_parser('rename', description='rename a repo')
+ p_rename = subparsers.add_parser('rename', description='rename a repo',
+ help='rename a repo')
p_rename.add_argument(
'repo',
nargs=1,
@@ -322,8 +427,25 @@ def main(argv=None):
p_rename.add_argument('new_name', help="new name")
p_rename.set_defaults(func=f_rename)
+ p_flags = subparsers.add_parser('flags',
+ description='Set custom git flags for repo.',
+ help='git flags configuration')
+ p_flags.set_defaults(func=f_flags)
+ flags_cmds = p_flags.add_subparsers(dest='flags_cmd',
+ help='additional help with sub-command -h')
+ flags_cmds.add_parser('ll',
+ description='display repos with custom flags')
+ pf_set = flags_cmds.add_parser('set',
+ description='Set flags for repo.')
+ pf_set.add_argument('repo', choices=utils.get_repos(),
+ help="repo name")
+ pf_set.add_argument('flags',
+ nargs=argparse.REMAINDER,
+ help="custom flags, use quotes")
+
p_color = subparsers.add_parser('color',
- description='display and modify branch coloring of the ll sub-command.')
+ description='display and modify branch coloring of the ll sub-command.',
+ help='color configuration')
p_color.set_defaults(func=f_color)
color_cmds = p_color.add_subparsers(dest='color_cmd',
help='additional help with sub-command -h')
@@ -339,7 +461,8 @@ def main(argv=None):
help="available colors")
p_info = subparsers.add_parser('info',
- description='list, add, or remove information items of the ll sub-command.')
+ description='list, add, or remove information items of the ll sub-command.',
+ help='information setting')
p_info.set_defaults(func=f_info)
info_cmds = p_info.add_subparsers(dest='info_cmd',
help='additional help with sub-command -h')
@@ -347,11 +470,11 @@ def main(argv=None):
description='show used and unused information items of the ll sub-command')
info_cmds.add_parser('add', description='Enable information item.'
).add_argument('info_item',
- choices=('branch', 'commit_msg', 'path'),
+ choices=info.ALL_INFO_ITEMS,
help="information item to add")
info_cmds.add_parser('rm', description='Disable information item.'
).add_argument('info_item',
- choices=('branch', 'commit_msg', 'path'),
+ choices=info.ALL_INFO_ITEMS,
help="information item to delete")
@@ -379,6 +502,7 @@ def main(argv=None):
p_ll.set_defaults(func=f_ll)
p_context = subparsers.add_parser('context',
+ help='set context',
description='Set and remove context. A context is a group.'
' When set, all operations apply only to repos in that group.')
p_context.add_argument('choice',
@@ -388,7 +512,8 @@ def main(argv=None):
p_context.set_defaults(func=f_context)
p_ls = subparsers.add_parser(
- 'ls', description='display names of all repos, or path of a chosen repo')
+ 'ls', help='show repo(s) or repo path',
+ description='display names of all repos, or path of a chosen repo')
p_ls.add_argument('repo',
nargs='?',
choices=utils.get_repos(),
@@ -396,7 +521,8 @@ def main(argv=None):
p_ls.set_defaults(func=f_ls)
p_group = subparsers.add_parser(
- 'group', description='list, add, or remove repo group(s)')
+ 'group', description='list, add, or remove repo group(s)',
+ help='group repos')
p_group.set_defaults(func=f_group)
group_cmds = p_group.add_subparsers(dest='group_cmd',
help='additional help with sub-command -h')
@@ -410,6 +536,7 @@ def main(argv=None):
help="repo(s) to be grouped")
pg_add.add_argument('-n', '--name',
dest='gname',
+ type=_group_name,
metavar='group-name',
required=True,
help="group name")
@@ -429,6 +556,7 @@ def main(argv=None):
choices=utils.get_groups(),
help="existing group to rename")
pg_rename.add_argument('new_name', metavar='new-name',
+ type=_group_name,
help="new group name")
group_cmds.add_parser('rm',
description='Remove group(s).').add_argument('to_ungroup',
@@ -439,6 +567,7 @@ def main(argv=None):
# superman mode
p_super = subparsers.add_parser(
'super',
+ help='run any git command/alias',
description='Superman mode: delegate any git command/alias in specified or '
'all repo(s).\n'
'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n'
@@ -446,7 +575,7 @@ def main(argv=None):
p_super.add_argument(
'man',
nargs=argparse.REMAINDER,
- help="execute arbitrary git command/alias for specified or all repos "
+ help="execute arbitrary git command/alias for specified or all repos\n"
"Example: gita super myrepo1 diff --name-only --staged "
"Another: gita super checkout master ")
p_super.set_defaults(func=f_super)
@@ -454,6 +583,7 @@ def main(argv=None):
# shell mode
p_shell = subparsers.add_parser(
'shell',
+ help='run any shell command',
description='shell mode: delegate any shell command in specified or '
'all repo(s).\n'
'Examples:\n \t gita shell pwd\n'
@@ -470,7 +600,7 @@ def main(argv=None):
cmds = utils.get_cmds_from_files()
for name, data in cmds.items():
help = data.get('help')
- cmd = data.get('cmd') or name
+ cmd = data['cmd']
if data.get('allow_all'):
choices = utils.get_choices()
nargs = '*'
@@ -481,7 +611,14 @@ def main(argv=None):
help += ' for the chosen repo(s) or group(s)'
sp = subparsers.add_parser(name, description=help)
sp.add_argument('repo', nargs=nargs, choices=choices, help=help)
- sp.set_defaults(func=f_git_cmd, cmd=cmd.split())
+ is_shell = bool(data.get('shell'))
+ sp.add_argument('-s', '--shell', default=is_shell, type=bool,
+ help='If set, run in shell mode')
+ if is_shell:
+ cmd = [cmd]
+ else:
+ cmd = cmd.split()
+ sp.set_defaults(func=f_git_cmd, cmd=cmd)
args = p.parse_args(argv)
diff --git a/gita/cmds.json b/gita/cmds.json
new file mode 100644
index 0000000..947a4dd
--- /dev/null
+++ b/gita/cmds.json
@@ -0,0 +1,89 @@
+{
+"br":{
+ "cmd": "git branch -vv",
+ "help":"show local branches"},
+"clean":{
+ "cmd": "git clean -dfx",
+ "help": "remove all untracked files/folders"},
+"diff":{
+ "cmd": "git diff",
+ "help": "git show differences"},
+"difftool":{
+ "cmd": "git difftool",
+ "disable_async": true,
+ "help": "show differences using a tool"
+ },
+"fetch":{
+ "cmd": "git fetch",
+ "allow_all": true,
+ "help": "fetch remote update"
+ },
+"last":{
+ "cmd": "git log -1 HEAD",
+ "help": "show log information of HEAD"
+ },
+"log":
+ {"cmd": "git log",
+ "disable_async": true,
+ "help": "show logs"
+ },
+"merge":{
+ "cmd": "git merge @{u}",
+ "help": "merge remote updates"
+ },
+"mergetool":{
+ "cmd": "git mergetool",
+ "disable_async": true,
+ "help": "merge updates with a tool"
+ },
+"patch":{
+ "cmd": "git format-patch HEAD~",
+ "help": "make a patch"
+ },
+"pull":{
+ "cmd": "git pull",
+ "allow_all": true,
+ "help": "pull remote updates"
+ },
+"push":{
+ "cmd": "git push",
+ "help": "push the local updates"
+ },
+"rebase":{
+ "cmd": "git rebase",
+ "help": "rebase from master"
+ },
+"reflog":{
+ "cmd": "git reflog",
+ "help": "show ref logs"
+ },
+"remote":{
+ "cmd": "git remote -v",
+ "help": "show remote settings"
+ },
+"reset":{
+ "cmd": "git reset",
+ "help": "reset repo(s)"
+ },
+"show":{
+ "cmd": "git show",
+ "disable_async": true,
+ "help": "show detailed commit information"
+ },
+"stash":{
+ "cmd": "git stash",
+ "help": "store uncommited changes"
+ },
+"stat":{
+ "cmd": "git diff --stat",
+ "help": "show edit statistics"
+ },
+"st":{
+ "cmd": "git status",
+ "help": "show status"
+ },
+"tag":{
+ "cmd": "git tag -n",
+ "help": "show tags"
+ }
+}
diff --git a/gita/cmds.yml b/gita/cmds.yml
deleted file mode 100644
index 8db932e..0000000
--- a/gita/cmds.yml
+++ /dev/null
@@ -1,65 +0,0 @@
-br:
- cmd: branch -vv
- help: show local branches
-clean:
- cmd: clean -dfx
- help: remove all untracked files/folders
-diff:
- help: show differences
-difftool:
- disable_async: true
- help: show differences using a tool
-fetch:
- allow_all: true
- help: fetch remote update
-last:
- cmd: log -1 HEAD
- help: show log information of HEAD
-log:
- disable_async: true
- help: show logs
-merge:
- cmd: merge @{u}
- help: merge remote updates
-mergetool:
- disable_async: true
- help: merge updates with a tool
-patch:
- cmd: format-patch HEAD~
- help: make a patch
-pull:
- allow_all: true
- help: pull remote updates
-push:
- help: push the local updates
-rebase:
- help: rebase from master
-reflog:
- help: show ref logs
-remote:
- cmd: remote -v
- help: show remote settings
-reset:
- help: reset repo(s)
-shortlog:
- disable_async: true
- help: show short log
-show:
- disable_async: true
- help: show detailed commit information
-show-branch:
- disable_async: true
- help: show detailed branch information
-stash:
- help: store uncommited changes
-stat:
- cmd: diff --stat
- help: show edit statistics
-st:
- help: show status
-tag:
- cmd: tag -n
- help: show tags
-whatchanged:
- disable_async: true
- help: show detailed log
diff --git a/gita/common.py b/gita/common.py
index ef3933d..abbef5f 100644
--- a/gita/common.py
+++ b/gita/common.py
@@ -1,16 +1,17 @@
import os
-def get_config_dir() -> str:
- parent = os.environ.get('XDG_CONFIG_HOME') or os.path.join(
- os.path.expanduser('~'), '.config')
- root = os.path.join(parent, "gita")
- return root
+def get_config_dir(root=None) -> str:
+ if root is None:
+ root = os.environ.get('XDG_CONFIG_HOME') or os.path.join(
+ os.path.expanduser('~'), '.config')
+ return os.path.join(root, "gita")
+ else:
+ return os.path.join(root, ".gita")
-def get_config_fname(fname: str) -> str:
+def get_config_fname(fname: str, root=None) -> str:
"""
Return the file name that stores the repo locations.
"""
- root = get_config_dir()
- return os.path.join(root, fname)
+ return os.path.join(get_config_dir(root), fname)
diff --git a/gita/info.py b/gita/info.py
index a8044e9..d18a097 100644
--- a/gita/info.py
+++ b/gita/info.py
@@ -1,5 +1,5 @@
import os
-import sys
+import csv
import yaml
import subprocess
from enum import Enum
@@ -31,40 +31,42 @@ class Color(str, Enum):
b_purple = '\x1b[35;1m'
b_cyan = '\x1b[36;1m'
b_white = '\x1b[37;1m'
+ underline = '\x1B[4m'
def show_colors(): # pragma: no cover
"""
"""
- names = {c.value: c.name for c in Color}
for i, c in enumerate(Color, start=1):
- if c != Color.end:
+ if c != Color.end and c != Color.underline:
print(f'{c.value}{c.name:<8} ', end='')
if i % 9 == 0:
print()
print(f'{Color.end}')
for situation, c in sorted(get_color_encoding().items()):
- print(f'{situation:<12}: {c}{names[c]:<8}{Color.end} ')
+ print(f'{situation:<12}: {Color[c].value}{c:<8}{Color.end} ')
@lru_cache()
def get_color_encoding() -> Dict[str, str]:
"""
Return color scheme for different local/remote situations.
+ In the format of {situation: color name}
"""
# custom settings
- yml_config = Path(common.get_config_fname('color.yml'))
- if yml_config.is_file():
- with open(yml_config, 'r') as stream:
- colors = yaml.load(stream, Loader=yaml.FullLoader)
+ csv_config = Path(common.get_config_fname('color.csv'))
+ if csv_config.is_file():
+ with open(csv_config, 'r') as f:
+ reader = csv.DictReader(f)
+ colors = next(reader)
else:
colors = {
- 'no-remote': Color.white.value,
- 'in-sync': Color.green.value,
- 'diverged': Color.red.value,
- 'local-ahead': Color.purple.value,
- 'remote-ahead': Color.yellow.value,
+ 'no-remote': Color.white.name,
+ 'in-sync': Color.green.name,
+ 'diverged': Color.red.name,
+ 'local-ahead': Color.purple.name,
+ 'remote-ahead': Color.yellow.name,
}
return colors
@@ -80,6 +82,7 @@ def get_info_funcs() -> List[Callable[[str], str]]:
all_info_items = {
'branch': get_repo_status,
'commit_msg': get_commit_msg,
+ 'commit_time': get_commit_time,
'path': get_path,
}
return [all_info_items[k] for k in to_display]
@@ -90,23 +93,26 @@ def get_info_items() -> List[str]:
Return the information items to be displayed in the `gita ll` command.
"""
# custom settings
- yml_config = Path(common.get_config_fname('info.yml'))
- if yml_config.is_file():
- with open(yml_config, 'r') as stream:
- display_items = yaml.load(stream, Loader=yaml.FullLoader)
+ csv_config = Path(common.get_config_fname('info.csv'))
+ if csv_config.is_file():
+ with open(csv_config, 'r') as f:
+ reader = csv.reader(f)
+ display_items = next(reader)
display_items = [x for x in display_items if x in ALL_INFO_ITEMS]
else:
# default settings
- display_items = ['branch', 'commit_msg']
+ display_items = ['branch', 'commit_msg', 'commit_time']
return display_items
-def get_path(path):
- return f'{Color.cyan}{path}{Color.end}'
+def get_path(prop: Dict[str, str]) -> str:
+ return f'{Color.cyan}{prop["path"]}{Color.end}'
+# TODO: do we need to add the flags here too?
def get_head(path: str) -> str:
- result = subprocess.run('git rev-parse --abbrev-ref HEAD'.split(),
+ result = subprocess.run('git symbolic-ref -q --short HEAD || git describe --tags --exact-match',
+ shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
@@ -114,12 +120,12 @@ def get_head(path: str) -> str:
return result.stdout.strip()
-def run_quiet_diff(args: List[str]) -> bool:
+def run_quiet_diff(flags: List[str], args: List[str]) -> int:
"""
Return the return code of git diff `args` in quiet mode
"""
result = subprocess.run(
- ['git', 'diff', '--quiet'] + args,
+ ['git'] + flags + ['diff', '--quiet'] + args,
stderr=subprocess.DEVNULL,
)
return result.returncode
@@ -135,50 +141,68 @@ def get_common_commit() -> str:
return result.stdout.strip()
-def has_untracked() -> bool:
+def has_untracked(flags: List[str]) -> bool:
"""
Return True if untracked file/folder exists
"""
- result = subprocess.run('git ls-files -zo --exclude-standard'.split(),
+ cmd = ['git'] + flags + 'ls-files -zo --exclude-standard'.split()
+ result = subprocess.run(cmd,
stdout=subprocess.PIPE)
return bool(result.stdout)
-def get_commit_msg(path: str) -> str:
+def get_commit_msg(prop: Dict[str, str]) -> str:
"""
Return the last commit message.
"""
# `git show-branch --no-name HEAD` is faster than `git show -s --format=%s`
- result = subprocess.run('git show-branch --no-name HEAD'.split(),
+ cmd = ['git'] + prop['flags'] + 'show-branch --no-name HEAD'.split()
+ result = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
- cwd=path)
+ cwd=prop['path'])
return result.stdout.strip()
-def get_repo_status(path: str, no_colors=False) -> str:
- head = get_head(path)
- dirty, staged, untracked, color = _get_repo_status(path, no_colors)
+def get_commit_time(prop: Dict[str, str]) -> str:
+ """
+ Return the last commit time in parenthesis.
+ """
+ cmd = ['git'] + prop['flags'] + 'log -1 --format=%cd --date=relative'.split()
+ result = subprocess.run(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ universal_newlines=True,
+ cwd=prop['path'])
+ return f"({result.stdout.strip()})"
+
+
+def get_repo_status(prop: Dict[str, str], no_colors=False) -> str:
+ head = get_head(prop['path'])
+ dirty, staged, untracked, color = _get_repo_status(prop, no_colors)
if color:
return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}'
return f'{head+" "+dirty+staged+untracked:<10}'
-def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]:
+def _get_repo_status(prop: Dict[str, str], no_colors: bool) -> Tuple[str]:
"""
Return the status of one repo
"""
+ path = prop['path']
+ flags = prop['flags']
os.chdir(path)
- dirty = '*' if run_quiet_diff([]) else ''
- staged = '+' if run_quiet_diff(['--cached']) else ''
- untracked = '_' if has_untracked() else ''
+ dirty = '*' if run_quiet_diff(flags, []) else ''
+ staged = '+' if run_quiet_diff(flags, ['--cached']) else ''
+ untracked = '_' if has_untracked(flags) else ''
if no_colors:
return dirty, staged, untracked, ''
- colors = get_color_encoding()
- diff_returncode = run_quiet_diff(['@{u}', '@{0}'])
+ colors = {situ: Color[name].value
+ for situ, name in get_color_encoding().items()}
+ diff_returncode = run_quiet_diff(flags, ['@{u}', '@{0}'])
has_no_remote = diff_returncode == 128
has_no_diff = diff_returncode == 0
if has_no_remote:
@@ -187,9 +211,9 @@ def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]:
color = colors['in-sync']
else:
common_commit = get_common_commit()
- outdated = run_quiet_diff(['@{u}', common_commit])
+ outdated = run_quiet_diff(flags, ['@{u}', common_commit])
if outdated:
- diverged = run_quiet_diff(['@{0}', common_commit])
+ diverged = run_quiet_diff(flags, ['@{0}', common_commit])
color = colors['diverged'] if diverged else colors['remote-ahead']
else: # local is ahead of remote
color = colors['local-ahead']
@@ -199,5 +223,6 @@ def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]:
ALL_INFO_ITEMS = {
'branch': get_repo_status,
'commit_msg': get_commit_msg,
+ 'commit_time': get_commit_time,
'path': get_path,
}
diff --git a/gita/utils.py b/gita/utils.py
index 9572f02..34fc435 100644
--- a/gita/utils.py
+++ b/gita/utils.py
@@ -1,62 +1,76 @@
import os
-import yaml
+import json
+import csv
import asyncio
import platform
+import subprocess
from functools import lru_cache, partial
from pathlib import Path
-from typing import List, Dict, Coroutine, Union, Iterator
+from typing import List, Dict, Coroutine, Union, Iterator, Tuple
+from collections import Counter, defaultdict
from . import info
from . import common
-@lru_cache()
-def get_context() -> Union[Path, None]:
+# TODO: python3.9 pathlib has is_relative_to() function
+def is_relative_to(kid: str, parent: str) -> bool:
"""
- Return the context: either a group name or 'none'
+ Both the `kid` and `parent` should be absolute path
"""
- config_dir = Path(common.get_config_dir())
- matches = list(config_dir.glob('*.context'))
- assert len(matches) < 2, "Cannot have multiple .context file"
- return matches[0] if matches else None
+ return parent == os.path.commonpath((kid, parent))
@lru_cache()
-def get_repos() -> Dict[str, str]:
+def get_repos(root=None) -> Dict[str, Dict[str, str]]:
"""
- Return a `dict` of repo name to repo absolute path
+ Return a `dict` of repo name to repo absolute path and repo type
+
+ @param root: Use local config if set. If None, use either global or local
+ config depending on cwd.
"""
- path_file = common.get_config_fname('repo_path')
+ path_file = common.get_config_fname('repos.csv', root)
repos = {}
- # Each line is a repo path and repo name separated by ,
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
with open(path_file) as f:
- for line in f:
- line = line.rstrip()
- if not line: # blank line
- continue
- path, name = line.split(',')
- if not is_git(path):
- continue
- if name not in repos:
- repos[name] = path
- else: # repo name collision for different paths: include parent path name
- par_name = os.path.basename(os.path.dirname(path))
- repos[os.path.join(par_name, name)] = path
+ rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'],
+ restval='') # it's actually a reader
+ repos = {r['name']:
+ {'path': r['path'], 'type': r['type'],
+ 'flags': r['flags'].split()}
+ for r in rows if is_git(r['path'], is_bare=True)}
+ if root is None: # detect if inside a main path
+ cwd = os.getcwd()
+ for prop in repos.values():
+ path = prop['path']
+ if prop['type'] == 'm' and is_relative_to(cwd, path):
+ return get_repos(path)
return repos
@lru_cache()
+def get_context() -> Union[Path, None]:
+ """
+ Return the context: either a group name or 'none'
+ """
+ config_dir = Path(common.get_config_dir())
+ matches = list(config_dir.glob('*.context'))
+ assert len(matches) < 2, "Cannot have multiple .context file"
+ return matches[0] if matches else None
+
+
+@lru_cache()
def get_groups() -> Dict[str, List[str]]:
"""
Return a `dict` of group name to repo names.
"""
- fname = common.get_config_fname('groups.yml')
+ fname = common.get_config_fname('groups.csv')
groups = {}
# Each line is a repo path and repo name separated by ,
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
with open(fname, 'r') as f:
- groups = yaml.load(f, Loader=yaml.FullLoader)
+ rows = csv.reader(f, delimiter=':')
+ groups = {r[0]: r[1].split() for r in rows}
return groups
@@ -75,10 +89,12 @@ def get_choices() -> List[Union[str, None]]:
return choices
-def is_git(path: str) -> bool:
+def is_git(path: str, is_bare=False) -> bool:
"""
Return True if the path is a git repo.
"""
+ if not os.path.exists(path):
+ return False
# An alternative is to call `git rev-parse --is-inside-work-tree`
# I don't see why that one is better yet.
# For a regular git repo, .git is a folder, for a worktree repo, .git is a file.
@@ -88,59 +104,172 @@ def is_git(path: str) -> bool:
# `git rev-parse --git-common-dir`
loc = os.path.join(path, '.git')
# TODO: we can display the worktree repos in a different font.
- return os.path.exists(loc)
-
-
-def rename_repo(repos: Dict[str, str], repo: str, new_name: str):
+ if os.path.exists(loc):
+ return True
+ if not is_bare:
+ return False
+ # detect bare repo
+ got = subprocess.run('git rev-parse --is-bare-repository'.split(),
+ stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
+ cwd=path
+ )
+ if got.returncode == 0 and got.stdout == b'true\n':
+ return True
+ return False
+
+def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
"""
Write new repo name to file
"""
- path = repos[repo]
+ if new_name in repos:
+ print(f"{new_name} is already in use!")
+ return
+ prop = repos[repo]
del repos[repo]
- repos[new_name] = path
- write_to_repo_file(repos, 'w')
-
-
-def write_to_repo_file(repos: Dict[str, str], mode: str):
+ repos[new_name] = prop
+ # write to local config if inside a main path
+ main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm')
+ cwd = os.getcwd()
+ is_local_config = True
+ for p in main_paths:
+ if is_relative_to(cwd, p):
+ write_to_repo_file(repos, 'w', p)
+ break
+ else: # global config
+ write_to_repo_file(repos, 'w')
+ is_local_config = False
+ # update groups only when outside any main repos
+ if is_local_config:
+ return
+ groups = get_groups()
+ for g, members in groups.items():
+ if repo in members:
+ members.remove(repo)
+ members.append(new_name)
+ groups[g] = sorted(members)
+ write_to_groups_file(groups, 'w')
+
+
+def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None):
"""
+ @param repos: each repo is {name: {properties}}
"""
- data = ''.join(f'{path},{name}\n' for name, path in repos.items())
- fname = common.get_config_fname('repo_path')
+ data = [(prop['path'], name, prop['type'], ' '.join(prop['flags']))
+ for name, prop in repos.items()]
+ fname = common.get_config_fname('repos.csv', root)
os.makedirs(os.path.dirname(fname), exist_ok=True)
- with open(fname, mode) as f:
- f.write(data)
+ with open(fname, mode, newline='') as f:
+ writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ writer.writerows(data)
def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
"""
"""
- fname = common.get_config_fname('groups.yml')
+ fname = common.get_config_fname('groups.csv')
os.makedirs(os.path.dirname(fname), exist_ok=True)
if not groups: # all groups are deleted
open(fname, 'w').close()
else:
- with open(fname, mode) as f:
- yaml.dump(groups, f, default_flow_style=None)
+ with open(fname, mode, newline='') as f:
+ data = [
+ (group, ' '.join(repos))
+ for group, repos in groups.items()
+ ]
+ writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ writer.writerows(data)
+
+
+def _make_name(path: str, repos: Dict[str, Dict[str, str]],
+ name_counts: Counter) -> str:
+ """
+ Given a new repo `path`, create a repo name. By default, basename is used.
+ If name collision exists, further include parent path name.
+
+ @param path: It should not be in `repos` and is absolute
+ """
+ name = os.path.basename(os.path.normpath(path))
+ if name in repos or name_counts[name] > 1:
+ par_name = os.path.basename(os.path.dirname(path))
+ return os.path.join(par_name, name)
+ return name
-def add_repos(repos: Dict[str, str], new_paths: List[str]):
+def _get_repo_type(path, repo_type, root) -> str:
"""
- Write new repo paths to file
+
+ """
+ if repo_type != '': # explicitly set
+ return repo_type
+ if root is not None and os.path.normpath(root) == os.path.normpath(path):
+ return 'm'
+ return ''
+
+
+def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
+ repo_type='', root=None, is_bare=False) -> Dict[str, Dict[str, str]]:
+ """
+ Write new repo paths to file; return the added repos.
@param repos: name -> path
"""
- existing_paths = set(repos.values())
- new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p))
+ existing_paths = {prop['path'] for prop in repos.values()}
+ new_paths = {p for p in new_paths if is_git(p, is_bare)}
new_paths = new_paths - existing_paths
+ new_repos = {}
if new_paths:
print(f"Found {len(new_paths)} new repo(s).")
- new_repos = {
- os.path.basename(os.path.normpath(path)): path
- for path in new_paths}
- write_to_repo_file(new_repos, 'a+')
+ name_counts = Counter(
+ os.path.basename(os.path.normpath(p)) for p in new_paths
+ )
+ new_repos = {_make_name(path, repos, name_counts): {
+ 'path': path,
+ 'type': _get_repo_type(path, repo_type, root),
+ 'flags': '',
+ } for path in new_paths}
+ # When root is not None, we could optionally set its type to 'm', i.e.,
+ # main repo.
+ write_to_repo_file(new_repos, 'a+', root)
else:
print('No new repos found!')
+ return new_repos
+
+
+def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]:
+ """
+ Return relative parent strings
+
+ For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
+ then return (b, c, d)
+ """
+ for p in paths:
+ if is_relative_to(repo_path, p):
+ break
+ else:
+ return ()
+ return (os.path.basename(p),
+ *os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1])
+
+
+def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
+ ) -> Dict[str, List[str]]:
+ """
+
+ """
+ # FIXME: the upstream code should make sure that paths are all independent
+ # i.e., each repo should be contained in one and only one path
+ new_groups = defaultdict(list)
+ for repo_name, prop in repos.items():
+ hash = _generate_dir_hash(prop['path'], paths)
+ if not hash:
+ continue
+ for i in range(1, len(hash)+1):
+ group_name = '-'.join(hash[:i])
+ new_groups[group_name].append(repo_name)
+ # FIXME: need to make sure the new group names don't clash with old ones
+ # or repo names
+ return new_groups
def parse_clone_config(fname: str) -> Iterator[List[str]]:
@@ -157,6 +286,7 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s
Run `cmds` asynchronously in `path` directory. Return the `path` if
execution fails.
"""
+ # TODO: deprecated since 3.8, will be removed in 3.10
process = await asyncio.create_subprocess_exec(
*cmds,
stdin=asyncio.subprocess.DEVNULL,
@@ -199,7 +329,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
return errors
-def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
+def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
"""
Return the status of all repos
"""
@@ -213,9 +343,14 @@ def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
funcs[idx] = partial(get_repo_status, no_colors=True)
for name in sorted(repos):
- path = repos[name]
- info_items = ' '.join(f(path) for f in funcs)
- yield f'{name:<{name_width}}{info_items}'
+ info_items = ' '.join(f(repos[name]) for f in funcs)
+ if repos[name]['type'] == 'm':
+ # ANSI color code also takes length in Python
+ name = f'{info.Color.underline}{name}{info.Color.end}'
+ width = name_width + 8
+ yield f'{name:<{width}}{info_items}'
+ else:
+ yield f'{name:<{name_width}}{info_items}'
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
@@ -231,17 +366,17 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
}
"""
# default config file
- fname = os.path.join(os.path.dirname(__file__), "cmds.yml")
- with open(fname, 'r') as stream:
- cmds = yaml.load(stream, Loader=yaml.FullLoader)
+ fname = os.path.join(os.path.dirname(__file__), "cmds.json")
+ with open(fname, 'r') as f:
+ cmds = json.load(f)
# custom config file
root = common.get_config_dir()
- fname = os.path.join(root, 'cmds.yml')
+ fname = os.path.join(root, 'cmds.json')
custom_cmds = {}
if os.path.isfile(fname) and os.path.getsize(fname):
- with open(fname, 'r') as stream:
- custom_cmds = yaml.load(stream, Loader=yaml.FullLoader)
+ with open(fname, 'r') as f:
+ custom_cmds = json.load(f)
# custom commands shadow default ones
cmds.update(custom_cmds)