summaryrefslogtreecommitdiffstats
path: root/gita/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'gita/utils.py')
-rw-r--r--gita/utils.py148
1 files changed, 121 insertions, 27 deletions
diff --git a/gita/utils.py b/gita/utils.py
index 34fc435..7a1020c 100644
--- a/gita/utils.py
+++ b/gita/utils.py
@@ -1,3 +1,4 @@
+import sys
import os
import json
import csv
@@ -13,12 +14,27 @@ from . import info
from . import common
-# TODO: python3.9 pathlib has is_relative_to() function
-def is_relative_to(kid: str, parent: str) -> bool:
+MAX_INT = sys.maxsize
+
+
+def get_relative_path(kid: str, parent: str) -> Union[List[str], None]:
"""
- Both the `kid` and `parent` should be absolute path
+ Return the relative path depth if relative, otherwise MAX_INT.
+
+ Both the `kid` and `parent` should be absolute paths without trailing /
"""
- return parent == os.path.commonpath((kid, parent))
+ # Note that os.path.commonpath has no trailing /
+ # TODO: python3.9 pathlib has is_relative_to() function
+ # TODO: Maybe use os.path.commonprefix? since it's faster?
+ if parent == '':
+ return None
+ if parent == os.path.commonpath((kid, parent)):
+ rel = os.path.normpath(os.path.relpath(kid, parent)).split(os.sep)
+ if rel == ['.']:
+ rel = []
+ return rel
+ else:
+ return None
@lru_cache()
@@ -43,7 +59,7 @@ def get_repos(root=None) -> Dict[str, Dict[str, str]]:
cwd = os.getcwd()
for prop in repos.values():
path = prop['path']
- if prop['type'] == 'm' and is_relative_to(cwd, path):
+ if prop['type'] == 'm' and get_relative_path(cwd, path) != MAX_INT:
return get_repos(path)
return repos
@@ -51,29 +67,94 @@ def get_repos(root=None) -> Dict[str, Dict[str, str]]:
@lru_cache()
def get_context() -> Union[Path, None]:
"""
- Return the context: either a group name or 'none'
+ Return context file path, or None if not set. Note that if in auto context
+ mode, the return value is not auto.context but the resolved context,
+ which could be None.
+
"""
config_dir = Path(common.get_config_dir())
matches = list(config_dir.glob('*.context'))
- assert len(matches) < 2, "Cannot have multiple .context file"
- return matches[0] if matches else None
+ if len(matches) > 1:
+ print("Cannot have multiple .context file")
+ sys.exit(1)
+ if not matches:
+ return None
+ ctx = matches[0]
+ if ctx.stem == 'auto':
+ cwd = str(Path.cwd())
+ repos = get_repos()
+ # The context is set to be the group with minimal distance to cwd
+ candidate = None
+ min_dist = MAX_INT
+ for gname, prop in get_groups().items():
+ rel = get_relative_path(cwd, prop['path'])
+ if rel is None:
+ continue
+ d = len(rel)
+ if d < min_dist:
+ candidate = gname
+ min_dist = d
+ if not candidate:
+ ctx = None
+ else:
+ ctx = ctx.with_name(f'{candidate}.context')
+ return ctx
@lru_cache()
-def get_groups() -> Dict[str, List[str]]:
+def get_groups() -> Dict[str, Dict]:
"""
- Return a `dict` of group name to repo names.
+ Return a `dict` of group name to group properties such as repo names and
+ group path.
"""
fname = common.get_config_fname('groups.csv')
groups = {}
- # Each line is a repo path and repo name separated by ,
+ # Each line is: group-name:repo1 repo2 repo3:group-path
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
with open(fname, 'r') as f:
- rows = csv.reader(f, delimiter=':')
- groups = {r[0]: r[1].split() for r in rows}
+ rows = csv.DictReader(f, ['name', 'repos', 'path'],
+ restval='', delimiter=':')
+ groups = {
+ r['name']: {
+ 'repos': r['repos'].split(),
+ 'path': r['path']
+ }
+ for r in rows}
return groups
+def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool:
+ """
+ Delete repo from groups
+ """
+ deleted = False
+ for name in groups:
+ try:
+ groups[name]['repos'].remove(repo)
+ except ValueError as e:
+ pass
+ else:
+ deleted = True
+ return deleted
+
+
+def replace_context(old: Union[Path, None], new: str):
+ """
+
+ """
+ auto = Path(common.get_config_dir()) / 'auto.context'
+ if auto.exists():
+ old = auto
+
+ if new == 'none': # delete
+ old and old.unlink()
+ elif old:
+ # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
+ old.rename(old.with_name(f'{new}.context'))
+ else:
+ open(auto.with_name(f'{new}.context'), 'w').close()
+
+
def get_choices() -> List[Union[str, None]]:
"""
Return all repo names, group names, and an additional empty list. The empty
@@ -117,6 +198,7 @@ def is_git(path: str, is_bare=False) -> bool:
return True
return False
+
def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
"""
Write new repo name to file
@@ -131,8 +213,9 @@ def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm')
cwd = os.getcwd()
is_local_config = True
+ # TODO: delete
for p in main_paths:
- if is_relative_to(cwd, p):
+ if get_relative_path(cwd, p) != MAX_INT:
write_to_repo_file(repos, 'w', p)
break
else: # global config
@@ -163,7 +246,8 @@ def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None):
writer.writerows(data)
-def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
+# TODO: combine with the repo writer
+def write_to_groups_file(groups: Dict[str, Dict], mode: str):
"""
"""
@@ -174,8 +258,8 @@ def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
else:
with open(fname, mode, newline='') as f:
data = [
- (group, ' '.join(repos))
- for group, repos in groups.items()
+ (group, ' '.join(prop['repos']), prop['path'])
+ for group, prop in groups.items()
]
writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(data)
@@ -191,11 +275,13 @@ def _make_name(path: str, repos: Dict[str, Dict[str, str]],
"""
name = os.path.basename(os.path.normpath(path))
if name in repos or name_counts[name] > 1:
+ # path has no trailing /
par_name = os.path.basename(os.path.dirname(path))
return os.path.join(par_name, name)
return name
+# TODO: delete
def _get_repo_type(path, repo_type, root) -> str:
"""
@@ -236,37 +322,45 @@ def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
return new_repos
-def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]:
+def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[
+ Tuple[str, ...], str]:
"""
- Return relative parent strings
+ Return relative parent strings, and the parent head string
For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
then return (b, c, d)
"""
for p in paths:
- if is_relative_to(repo_path, p):
+ rel = get_relative_path(repo_path, p)[:-1]
+ if rel is not None:
break
else:
- return ()
- return (os.path.basename(p),
- *os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1])
+ return (), ''
+ head, tail = os.path.split(p)
+ return (tail, *rel), head
def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
- ) -> Dict[str, List[str]]:
+ ) -> Dict[str, Dict]:
"""
+ @params repos: repos to be grouped
"""
# FIXME: the upstream code should make sure that paths are all independent
# i.e., each repo should be contained in one and only one path
- new_groups = defaultdict(list)
+ new_groups = defaultdict(dict)
for repo_name, prop in repos.items():
- hash = _generate_dir_hash(prop['path'], paths)
+ hash, head = _generate_dir_hash(prop['path'], paths)
if not hash:
continue
for i in range(1, len(hash)+1):
group_name = '-'.join(hash[:i])
- new_groups[group_name].append(repo_name)
+ prop = new_groups[group_name]
+ prop['path'] = os.path.join(head, *hash[:i])
+ if 'repos' not in prop:
+ prop['repos'] = [repo_name]
+ else:
+ prop['repos'].append(repo_name)
# FIXME: need to make sure the new group names don't clash with old ones
# or repo names
return new_groups