diff options
-rw-r--r-- | .github/workflows/nos.yml | 20 | ||||
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | gita/__main__.py | 4 | ||||
-rw-r--r-- | gita/common.py | 2 | ||||
-rw-r--r-- | gita/info.py | 8 | ||||
-rw-r--r-- | gita/utils.py | 262 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/test_main.py | 13 | ||||
-rw-r--r-- | tests/test_utils.py | 289 |
9 files changed, 371 insertions, 233 deletions
diff --git a/.github/workflows/nos.yml b/.github/workflows/nos.yml index 9b81b05..52cf572 100644 --- a/.github/workflows/nos.yml +++ b/.github/workflows/nos.yml @@ -4,24 +4,26 @@ on: [push, pull_request] jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - python-version: [3.6, 3.7, 3.8, 3.9] + os: [ubuntu-20.04, macos-latest, windows-latest] + python-version: [3.6, 3.7, 3.8, 3.9, "3.10", "3.11"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependences run: | - python -m pip install --upgrade pip + python -m pip install --upgrade pip wheel pip install -r requirements.txt - pip install . + pip install -e . - name: Pytest run: | - pytest tests --cov=./gita + pytest tests --cov=./gita --cov-report=xml - name: Upload coverage to Codecov - uses: codecov/codecov-action@v2 + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} @@ -128,8 +128,8 @@ If more than one repos are specified, the `git` command runs asynchronously, with the exception of `log`, `difftool` and `mergetool`, which require non-trivial user input. -Repo configuration is saved in `$XDG_CONFIG_HOME/gita/repos.csv` -(most likely `~/.config/gita/repos.csv`). +Repo configuration global is saved in `$XDG_CONFIG_HOME/gita/repos.csv` +(most likely `~/.config/gita/repos.csv`) or if you prefered at project configuration add environment variable `GITA_PROJECT_HOME`. ## Installation diff --git a/gita/__main__.py b/gita/__main__.py index 2517958..4cc0e1e 100644 --- a/gita/__main__.py +++ b/gita/__main__.py @@ -48,10 +48,10 @@ def _group_name(name: str, exclude_old_names=True) -> str: def _path_name(name: str) -> str: """ - Return absolute path without trailing / + Return absolute path """ if name: - return os.path.abspath(name).rstrip(os.path.sep) + return os.path.abspath(name) return "" diff --git a/gita/common.py b/gita/common.py index e1a3dde..994e5e0 100644 --- a/gita/common.py +++ b/gita/common.py @@ -2,7 +2,7 @@ import os def get_config_dir() -> str: - root = os.environ.get('XDG_CONFIG_HOME') or os.path.join( + root = os.environ.get('GITA_PROJECT_HOME') or os.environ.get('XDG_CONFIG_HOME') or os.path.join( os.path.expanduser('~'), '.config') return os.path.join(root, "gita") diff --git a/gita/info.py b/gita/info.py index ee302d0..57bb1a8 100644 --- a/gita/info.py +++ b/gita/info.py @@ -9,7 +9,7 @@ from typing import Tuple, List, Callable, Dict from . import common -class Color(str, Enum): +class Color(Enum): """ Terminal color """ @@ -32,6 +32,12 @@ class Color(str, Enum): b_white = '\x1b[37;1m' underline = '\x1B[4m' + # Make f"{Color.foo}" expand to Color.foo.value . + # + # See https://stackoverflow.com/a/24487545 + def __str__(self): + return f"{self.value}" + default_colors = { 'no-remote': Color.white.name, diff --git a/gita/utils.py b/gita/utils.py index 5332255..76aebce 100644 --- a/gita/utils.py +++ b/gita/utils.py @@ -9,6 +9,8 @@ from functools import lru_cache, partial from pathlib import Path from typing import List, Dict, Coroutine, Union, Iterator, Tuple from collections import Counter, defaultdict +from concurrent.futures import ThreadPoolExecutor +import multiprocessing from . import info from . import common @@ -17,24 +19,25 @@ from . import common MAX_INT = sys.maxsize -def get_relative_path(kid: str, parent: str) -> Union[List[str], None]: +def get_relative_path(kid: os.PathLike, parent: str) -> Union[List[str], None]: """ - Return the relative path depth if relative, otherwise MAX_INT. + Return the relative path depth if relative, otherwise None. - Both the `kid` and `parent` should be absolute paths without trailing / + Both the `kid` and `parent` should be absolute paths """ - # Note that os.path.commonpath has no trailing / - # TODO: python3.9 pathlib has is_relative_to() function - # TODO: Maybe use os.path.commonprefix? since it's faster? - if parent == '': + if parent == "": return None - if parent == os.path.commonpath((kid, parent)): - rel = os.path.normpath(os.path.relpath(kid, parent)).split(os.sep) - if rel == ['.']: - rel = [] - return rel - else: + + p_kid = Path(kid) + # p_kid = Path(kid).resolve() + try: + p_rel = p_kid.relative_to(parent) + except ValueError: return None + rel = str(p_rel).split(os.sep) + if rel == ["."]: + rel = [] + return rel @lru_cache() @@ -43,16 +46,22 @@ def get_repos() -> Dict[str, Dict[str, str]]: Return a `dict` of repo name to repo absolute path and repo type """ - path_file = common.get_config_fname('repos.csv') + path_file = common.get_config_fname("repos.csv") repos = {} if os.path.isfile(path_file) and os.stat(path_file).st_size > 0: with open(path_file) as f: - rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'], - restval='') # it's actually a reader - repos = {r['name']: - {'path': r['path'], 'type': r['type'], - 'flags': r['flags'].split()} - for r in rows if is_git(r['path'], include_bare=True)} + rows = csv.DictReader( + f, ["path", "name", "type", "flags"], restval="" + ) # it's actually a reader + repos = { + r["name"]: { + "path": r["path"], + "type": r["type"], + "flags": r["flags"].split(), + } + for r in rows + if is_git(r["path"], include_bare=True) + } return repos @@ -65,20 +74,19 @@ def get_context() -> Union[Path, None]: """ config_dir = Path(common.get_config_dir()) - matches = list(config_dir.glob('*.context')) + matches = list(config_dir.glob("*.context")) if len(matches) > 1: print("Cannot have multiple .context file") sys.exit(1) if not matches: return None ctx = matches[0] - if ctx.stem == 'auto': - cwd = str(Path.cwd()) + if ctx.stem == "auto": # The context is set to be the group with minimal distance to cwd candidate = None min_dist = MAX_INT for gname, prop in get_groups().items(): - rel = get_relative_path(cwd, prop['path']) + rel = get_relative_path(Path.cwd(), prop["path"]) if rel is None: continue d = len(rel) @@ -88,7 +96,7 @@ def get_context() -> Union[Path, None]: if not candidate: ctx = None else: - ctx = ctx.with_name(f'{candidate}.context') + ctx = ctx.with_name(f"{candidate}.context") return ctx @@ -98,19 +106,23 @@ def get_groups() -> Dict[str, Dict[str, Union[str, List]]]: Return a `dict` of group name to group properties such as repo names and group path. """ - fname = common.get_config_fname('groups.csv') + fname = common.get_config_fname("groups.csv") groups = {} + repos = get_repos() # Each line is: group-name:repo1 repo2 repo3:group-path if os.path.isfile(fname) and os.stat(fname).st_size > 0: - with open(fname, 'r') as f: - rows = csv.DictReader(f, ['name', 'repos', 'path'], - restval='', delimiter=':') + with open(fname, "r") as f: + rows = csv.DictReader( + f, ["name", "repos", "path"], restval="", delimiter=":" + ) + # filter out invalid repos groups = { - r['name']: { - 'repos': r['repos'].split(), - 'path': r['path'] - } - for r in rows} + r["name"]: { + "repos": [repo for repo in r["repos"].split() if repo in repos], + "path": r["path"], + } + for r in rows + } return groups @@ -121,7 +133,7 @@ def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool: deleted = False for name in groups: try: - groups[name]['repos'].remove(repo) + groups[name]["repos"].remove(repo) except ValueError as e: pass else: @@ -130,20 +142,18 @@ def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool: def replace_context(old: Union[Path, None], new: str): - """ - - """ - auto = Path(common.get_config_dir()) / 'auto.context' + """ """ + auto = Path(common.get_config_dir()) / "auto.context" if auto.exists(): old = auto - if new == 'none': # delete + if new == "none": # delete old and old.unlink() elif old: # ctx.rename(ctx.with_stem(new_name)) # only works in py3.9 - old.rename(old.with_name(f'{new}.context')) + old.rename(old.with_name(f"{new}.context")) else: - Path(auto.with_name(f'{new}.context')).write_text('') + Path(auto.with_name(f"{new}.context")).write_text("") def get_choices() -> List[Union[str, None]]: @@ -162,10 +172,8 @@ def get_choices() -> List[Union[str, None]]: def is_submodule_repo(p: Path) -> bool: - """ - - """ - if p.is_file() and '.git/modules' in p.read_text(): + """ """ + if p.is_file() and ".git/modules" in p.read_text(): return True return False @@ -183,7 +191,7 @@ def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool: # A more reliable way to differentiable regular and worktree repos is to # compare the result of `git rev-parse --git-dir` and # `git rev-parse --git-common-dir` - loc = os.path.join(path, '.git') + loc = os.path.join(path, ".git") # TODO: we can display the worktree repos in a different font. if os.path.exists(loc): if exclude_submodule and is_submodule_repo(Path(loc)): @@ -192,11 +200,13 @@ def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool: if not include_bare: return False # detect bare repo - got = subprocess.run('git rev-parse --is-bare-repository'.split(), - stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, - cwd=path - ) - if got.returncode == 0 and got.stdout == b'true\n': + got = subprocess.run( + "git rev-parse --is-bare-repository".split(), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + cwd=path, + ) + if got.returncode == 0 and got.stdout == b"true\n": return True return False @@ -211,16 +221,16 @@ def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str): prop = repos[repo] del repos[repo] repos[new_name] = prop - write_to_repo_file(repos, 'w') + write_to_repo_file(repos, "w") groups = get_groups() for g, values in groups.items(): - members = values['repos'] + members = values["repos"] if repo in members: members.remove(repo) members.append(new_name) - groups[g]['repos'] = sorted(members) - write_to_groups_file(groups, 'w') + groups[g]["repos"] = sorted(members) + write_to_groups_file(groups, "w") def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str): @@ -228,40 +238,43 @@ def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str): @param repos: each repo is {name: {properties}} """ # The 3rd column is repo type; unused field - data = [(prop['path'], name, '', ' '.join(prop['flags'])) - for name, prop in repos.items()] - fname = common.get_config_fname('repos.csv') + data = [ + (prop["path"], name, "", " ".join(prop["flags"])) + for name, prop in repos.items() + ] + fname = common.get_config_fname("repos.csv") os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, mode, newline='') as f: - writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + with open(fname, mode, newline="") as f: + writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) writer.writerows(data) # TODO: combine with the repo writer def write_to_groups_file(groups: Dict[str, Dict], mode: str): - """ - - """ - fname = common.get_config_fname('groups.csv') + """ """ + fname = common.get_config_fname("groups.csv") os.makedirs(os.path.dirname(fname), exist_ok=True) if not groups: # all groups are deleted - Path(fname).write_text('') + Path(fname).write_text("") else: # delete the group if there are no repos for name in list(groups): - if not groups[name]['repos']: + if not groups[name]["repos"]: del groups[name] - with open(fname, mode, newline='') as f: + with open(fname, mode, newline="") as f: data = [ - (group, ' '.join(prop['repos']), prop['path']) - for group, prop in groups.items() - ] - writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL) + (group, " ".join(prop["repos"]), prop["path"]) + for group, prop in groups.items() + ] + writer = csv.writer( + f, delimiter=":", quotechar='"', quoting=csv.QUOTE_MINIMAL + ) writer.writerows(data) -def _make_name(path: str, repos: Dict[str, Dict[str, str]], - name_counts: Counter) -> str: +def _make_name( + path: str, repos: Dict[str, Dict[str, str]], name_counts: Counter +) -> str: """ Given a new repo `path`, create a repo name. By default, basename is used. If name collision exists, further include parent path name. @@ -276,17 +289,19 @@ def _make_name(path: str, repos: Dict[str, Dict[str, str]], return name -def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str], - include_bare=False, - exclude_submodule=False, - dry_run=False, - ) -> Dict[str, Dict[str, str]]: +def add_repos( + repos: Dict[str, Dict[str, str]], + new_paths: List[str], + include_bare=False, + exclude_submodule=False, + dry_run=False, +) -> Dict[str, Dict[str, str]]: """ Write new repo paths to file; return the added repos. @param repos: name -> path """ - existing_paths = {prop['path'] for prop in repos.values()} + existing_paths = {prop["path"] for prop in repos.values()} new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)} new_paths = new_paths - existing_paths new_repos = {} @@ -296,21 +311,21 @@ def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str], for p in new_paths: print(p) return {} - name_counts = Counter( - os.path.basename(os.path.normpath(p)) for p in new_paths - ) - new_repos = {_make_name(path, repos, name_counts): { - 'path': path, - 'flags': '', - } for path in new_paths} - write_to_repo_file(new_repos, 'a+') + name_counts = Counter(os.path.basename(os.path.normpath(p)) for p in new_paths) + new_repos = { + _make_name(path, repos, name_counts): { + "path": path, + "flags": "", + } + for path in new_paths + } + write_to_repo_file(new_repos, "a+") else: - print('No new repos found!') + print("No new repos found!") return new_repos -def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[ - Tuple[str, ...], str]: +def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[Tuple[str, ...], str]: """ Return relative parent strings, and the parent head string @@ -322,13 +337,12 @@ def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[ if rel is not None: break else: - return (), '' + return (), "" head, tail = os.path.split(p) return (tail, *rel), head -def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str] - ) -> Dict[str, Dict]: +def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, Dict]: """ @params repos: repos to be grouped @@ -337,17 +351,17 @@ def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str] # i.e., each repo should be contained in one and only one path new_groups = defaultdict(dict) for repo_name, prop in repos.items(): - hash, head = _generate_dir_hash(prop['path'], paths) + hash, head = _generate_dir_hash(prop["path"], paths) if not hash: continue - for i in range(1, len(hash)+1): - group_name = '-'.join(hash[:i]) + for i in range(1, len(hash) + 1): + group_name = "-".join(hash[:i]) prop = new_groups[group_name] - prop['path'] = os.path.join(head, *hash[:i]) - if 'repos' not in prop: - prop['repos'] = [repo_name] + prop["path"] = os.path.join(head, *hash[:i]) + if "repos" not in prop: + prop["repos"] = [repo_name] else: - prop['repos'].append(repo_name) + prop["repos"].append(repo_name) # FIXME: need to make sure the new group names don't clash with old ones # or repo names return new_groups @@ -359,7 +373,7 @@ def parse_clone_config(fname: str) -> Iterator[List[str]]: """ with open(fname) as f: for line in f: - yield line.strip().split(',') + yield line.strip().split(",") async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]: @@ -374,7 +388,8 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, start_new_session=True, - cwd=path) + cwd=path, + ) stdout, stderr = await process.communicate() for pipe in (stdout, stderr): if pipe: @@ -389,7 +404,7 @@ def format_output(s: str, prefix: str): """ Prepends every line in given string with the given prefix. """ - return ''.join([f'{prefix}: {line}' for line in s.splitlines(keepends=True)]) + return "".join([f"{prefix}: {line}" for line in s.splitlines(keepends=True)]) def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: @@ -397,7 +412,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]: Execute tasks asynchronously """ # TODO: asyncio API is nicer in python 3.7 - if platform.system() == 'Windows': + if platform.system() == "Windows": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: @@ -415,17 +430,20 @@ def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str: Return the status of all repos """ if repos: - name_width = max(len(n) for n in repos) + 1 - funcs = info.get_info_funcs() + name_width = len(max(repos, key=len)) + 1 + funcs = info.get_info_funcs() - get_repo_status = info.get_repo_status - if get_repo_status in funcs and no_colors: - idx = funcs.index(get_repo_status) - funcs[idx] = partial(get_repo_status, no_colors=True) + get_repo_status = info.get_repo_status + if get_repo_status in funcs and no_colors: + idx = funcs.index(get_repo_status) + funcs[idx] = partial(get_repo_status, no_colors=True) - for name in sorted(repos): - info_items = ' '.join(f(repos[name]) for f in funcs) - yield f'{name:<{name_width}}{info_items}' + num_threads = min(multiprocessing.cpu_count(), len(repos)) + with ThreadPoolExecutor(max_workers=num_threads) as executor: + for line in executor.map( + lambda repo: f'{repo:<{name_width}}{" ".join(f(repos[repo]) for f in funcs)}', + sorted(repos)): + yield line def get_cmds_from_files() -> Dict[str, Dict[str, str]]: @@ -442,15 +460,15 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: """ # default config file fname = os.path.join(os.path.dirname(__file__), "cmds.json") - with open(fname, 'r') as f: + with open(fname, "r") as f: cmds = json.load(f) # custom config file root = common.get_config_dir() - fname = os.path.join(root, 'cmds.json') + fname = os.path.join(root, "cmds.json") custom_cmds = {} if os.path.isfile(fname) and os.path.getsize(fname): - with open(fname, 'r') as f: + with open(fname, "r") as f: custom_cmds = json.load(f) # custom commands shadow default ones @@ -458,8 +476,10 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]: return cmds -def parse_repos_and_rest(input: List[str], quote_mode=False, - ) -> Tuple[Dict[str, Dict[str, str]], List[str]]: +def parse_repos_and_rest( + input: List[str], + quote_mode=False, +) -> Tuple[Dict[str, Dict[str, str]], List[str]]: """ Parse gita input arguments @@ -481,7 +501,7 @@ def parse_repos_and_rest(input: List[str], quote_mode=False, if not names and ctx: names = [ctx.stem] if quote_mode and i + 1 != len(input): - print(input[i], 'is not a repo or group' ) + print(input[i], "is not a repo or group") sys.exit(2) if names: @@ -490,7 +510,7 @@ def parse_repos_and_rest(input: List[str], quote_mode=False, if k in repos: chosen[k] = repos[k] if k in groups: - for r in groups[k]['repos']: + for r in groups[k]["repos"]: chosen[r] = repos[r] # if not set here, all repos are chosen repos = chosen @@ -7,7 +7,7 @@ with open("README.md", encoding="utf-8") as f: setup( name="gita", packages=["gita"], - version="0.16.2", + version="0.16.3", license="MIT", description="Manage multiple git repos with sanity", long_description=long_description, diff --git a/tests/test_main.py b/tests/test_main.py index e36c2d8..0f2eeb5 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -94,14 +94,14 @@ class TestLsLl: out, err = capfd.readouterr() assert err == "" assert "gita" in out - assert info.Color.end in out + assert info.Color.end.value in out # no color on branch name __main__.main(["ll", "-C"]) out, err = capfd.readouterr() assert err == "" assert "gita" in out - assert info.Color.end not in out + assert info.Color.end.value not in out __main__.main(["ls", "gita"]) out, err = capfd.readouterr() @@ -367,8 +367,9 @@ class TestGroupCmd: assert err == "" assert "xx yy\n" == out + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) - def test_ll(self, _, capfd): + def test_ll(self, _, __, capfd): args = argparse.Namespace() args.to_group = None args.group_cmd = None @@ -382,8 +383,9 @@ class TestGroupCmd: == "\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n" ) + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) - def test_ll_with_group(self, _, capfd): + def test_ll_with_group(self, _, __, capfd): args = argparse.Namespace() args.to_group = None args.group_cmd = None @@ -394,9 +396,10 @@ class TestGroupCmd: assert err == "" assert "a c d\n" == out + @patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) @patch("gita.common.get_config_fname", return_value=GROUP_FNAME) @patch("gita.utils.write_to_groups_file") - def test_rename(self, mock_write, _): + def test_rename(self, mock_write, *_): args = argparse.Namespace() args.gname = "xx" args.new_name = "zz" diff --git a/tests/test_utils.py b/tests/test_utils.py index 7433532..9679116 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,134 +6,237 @@ from unittest.mock import patch, mock_open from gita import utils, info from conftest import ( - PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME, TEST_DIR, + PATH_FNAME, + PATH_FNAME_EMPTY, + PATH_FNAME_CLASH, + GROUP_FNAME, + TEST_DIR, ) -@pytest.mark.parametrize('input, expected', [ - ([], ({'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []}, 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []}, 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}}, [])), - (['st'], ({'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []}, 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []}, 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}}, ['st'])), - (['repo1', 'st'], ({'repo1': {'flags': [], 'path': '/a/bcd/repo1', 'type': ''}}, ['st'])), - (['repo1'], ({'repo1': {'flags': [], 'path': '/a/bcd/repo1', 'type': ''}}, [])), - ]) -@patch('gita.utils.is_git', return_value=True) -@patch('gita.common.get_config_fname', return_value=PATH_FNAME) +@pytest.mark.parametrize( + "kid, parent, expected", + [ + ("/a/b/repo", "/a/b", ["repo"]), + ("/a/b/repo", "/a", ["b", "repo"]), + ("/a/b/repo", "/a/", ["b", "repo"]), + ("/a/b/repo", "", None), + ("/a/b/repo", "/a/b/repo", []), + ], +) +def test_get_relative_path(kid, parent, expected): + assert expected == utils.get_relative_path(kid, parent) + + +@pytest.mark.parametrize( + "input, expected", + [ + ( + [], + ( + { + "repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []}, + "xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []}, + "repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []}, + }, + [], + ), + ), + ( + ["st"], + ( + { + "repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []}, + "xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []}, + "repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []}, + }, + ["st"], + ), + ), + ( + ["repo1", "st"], + ({"repo1": {"flags": [], "path": "/a/bcd/repo1", "type": ""}}, ["st"]), + ), + (["repo1"], ({"repo1": {"flags": [], "path": "/a/bcd/repo1", "type": ""}}, [])), + ], +) +@patch("gita.utils.is_git", return_value=True) +@patch("gita.common.get_config_fname", return_value=PATH_FNAME) def test_parse_repos_and_rest(mock_path_fname, _, input, expected): got = utils.parse_repos_and_rest(input) assert got == expected -@pytest.mark.parametrize('repo_path, paths, expected', [ - ('/a/b/c/repo', ['/a/b'], (('b', 'c'), '/a')), - ]) +@pytest.mark.parametrize( + "repo_path, paths, expected", + [ + ("/a/b/c/repo", ["/a/b"], (("b", "c"), "/a")), + ], +) def test_generate_dir_hash(repo_path, paths, expected): got = utils._generate_dir_hash(repo_path, paths) assert got == expected -@pytest.mark.parametrize('repos, paths, expected', [ - ({'r1': {'path': '/a/b//repo1'}, 'r2': {'path': '/a/b/repo2'}}, - ['/a/b'], {'b': {'repos': ['r1', 'r2'], 'path': '/a/b'}}), - ({'r1': {'path': '/a/b//repo1'}, 'r2': {'path': '/a/b/c/repo2'}}, - ['/a/b'], {'b': {'repos': ['r1', 'r2'], 'path': '/a/b'}, - 'b-c': {'repos': ['r2'], 'path': "/a/b/c"}}), - ({'r1': {'path': '/a/b/c/repo1'}, 'r2': {'path': '/a/b/c/repo2'}}, - ['/a/b'], {'b-c': {'repos': ['r1', 'r2'], 'path': '/a/b/c'}, - 'b': {'path': '/a/b', 'repos': ['r1', 'r2']}}), - ]) +@pytest.mark.parametrize( + "repos, paths, expected", + [ + ( + {"r1": {"path": "/a/b//repo1"}, "r2": {"path": "/a/b/repo2"}}, + ["/a/b"], + {"b": {"repos": ["r1", "r2"], "path": "/a/b"}}, + ), + ( + {"r1": {"path": "/a/b//repo1"}, "r2": {"path": "/a/b/c/repo2"}}, + ["/a/b"], + { + "b": {"repos": ["r1", "r2"], "path": "/a/b"}, + "b-c": {"repos": ["r2"], "path": "/a/b/c"}, + }, + ), + ( + {"r1": {"path": "/a/b/c/repo1"}, "r2": {"path": "/a/b/c/repo2"}}, + ["/a/b"], + { + "b-c": {"repos": ["r1", "r2"], "path": "/a/b/c"}, + "b": {"path": "/a/b", "repos": ["r1", "r2"]}, + }, + ), + ], +) def test_auto_group(repos, paths, expected): got = utils.auto_group(repos, paths) assert got == expected -@pytest.mark.parametrize('test_input, diff_return, expected', [ - ([{'abc': {'path': '/root/repo/', 'type': '', 'flags': []}}, False], - True, 'abc \x1b[31mrepo *+_ \x1b[0m msg xx'), - ([{'abc': {'path': '/root/repo/', 'type': '', 'flags': []}}, True], - True, 'abc repo *+_ msg xx'), - ([{'repo': {'path': '/root/repo2/', 'type': '', 'flags': []}}, False], - False, 'repo \x1b[32mrepo _ \x1b[0m msg xx'), -]) +@pytest.mark.parametrize( + "test_input, diff_return, expected", + [ + ( + [{"abc": {"path": "/root/repo/", "type": "", "flags": []}}, False], + True, + "abc \x1b[31mrepo *+_ \x1b[0m msg xx", + ), + ( + [{"abc": {"path": "/root/repo/", "type": "", "flags": []}}, True], + True, + "abc repo *+_ msg xx", + ), + ( + [{"repo": {"path": "/root/repo2/", "type": "", "flags": []}}, False], + False, + "repo \x1b[32mrepo _ \x1b[0m msg xx", + ), + ], +) def test_describe(test_input, diff_return, expected, monkeypatch): - monkeypatch.setattr(info, 'get_head', lambda x: 'repo') - monkeypatch.setattr(info, 'run_quiet_diff', lambda *_: diff_return) - monkeypatch.setattr(info, 'get_commit_msg', lambda *_: "msg") - monkeypatch.setattr(info, 'get_commit_time', lambda *_: "xx") - monkeypatch.setattr(info, 'has_untracked', lambda *_: True) - monkeypatch.setattr('os.chdir', lambda x: None) + monkeypatch.setattr(info, "get_head", lambda x: "repo") + monkeypatch.setattr(info, "run_quiet_diff", lambda *_: diff_return) + monkeypatch.setattr(info, "get_commit_msg", lambda *_: "msg") + monkeypatch.setattr(info, "get_commit_time", lambda *_: "xx") + monkeypatch.setattr(info, "has_untracked", lambda *_: True) + monkeypatch.setattr("os.chdir", lambda x: None) info.get_color_encoding.cache_clear() # avoid side effect assert expected == next(utils.describe(*test_input)) -@pytest.mark.parametrize('path_fname, expected', [ - (PATH_FNAME, { - 'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []}, - 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}, - 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []}, - }), - (PATH_FNAME_EMPTY, {}), - (PATH_FNAME_CLASH, { - 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': ['--haha', '--pp']}, - 'repo1': {'path': '/root/x/repo1', 'type': '', 'flags': []} - }), -]) -@patch('gita.utils.is_git', return_value=True) -@patch('gita.common.get_config_fname') +@pytest.mark.parametrize( + "path_fname, expected", + [ + ( + PATH_FNAME, + { + "repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []}, + "repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []}, + "xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []}, + }, + ), + (PATH_FNAME_EMPTY, {}), + ( + PATH_FNAME_CLASH, + { + "repo2": { + "path": "/e/fgh/repo2", + "type": "", + "flags": ["--haha", "--pp"], + }, + "repo1": {"path": "/root/x/repo1", "type": "", "flags": []}, + }, + ), + ], +) +@patch("gita.utils.is_git", return_value=True) +@patch("gita.common.get_config_fname") def test_get_repos(mock_path_fname, _, path_fname, expected): mock_path_fname.return_value = path_fname utils.get_repos.cache_clear() assert utils.get_repos() == expected -@patch('gita.common.get_config_dir') +@patch("gita.common.get_config_dir") def test_get_context(mock_config_dir): mock_config_dir.return_value = TEST_DIR utils.get_context.cache_clear() - assert utils.get_context() == TEST_DIR / 'xx.context' + assert utils.get_context() == TEST_DIR / "xx.context" - mock_config_dir.return_value = '/' + mock_config_dir.return_value = "/" utils.get_context.cache_clear() assert utils.get_context() == None -@pytest.mark.parametrize('group_fname, expected', [ - (GROUP_FNAME, {'xx': {'repos': ['a', 'b'], 'path': ''}, - 'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}), -]) -@patch('gita.common.get_config_fname') -def test_get_groups(mock_group_fname, group_fname, expected): +@pytest.mark.parametrize( + "group_fname, expected", + [ + ( + GROUP_FNAME, + { + "xx": {"repos": ["a", "b"], "path": ""}, + "yy": {"repos": ["a", "c", "d"], "path": ""}, + }, + ), + ], +) +@patch("gita.common.get_config_fname") +@patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""}) +def test_get_groups(_, mock_group_fname, group_fname, expected): mock_group_fname.return_value = group_fname utils.get_groups.cache_clear() assert utils.get_groups() == expected -@patch('os.path.isfile', return_value=True) -@patch('os.path.getsize', return_value=True) +@patch("os.path.isfile", return_value=True) +@patch("os.path.getsize", return_value=True) def test_custom_push_cmd(*_): - with patch('builtins.open', - mock_open(read_data='{"push":{"cmd":"hand","help":"me","allow_all":true}}')): + with patch( + "builtins.open", + mock_open(read_data='{"push":{"cmd":"hand","help":"me","allow_all":true}}'), + ): cmds = utils.get_cmds_from_files() - assert cmds['push'] == {'cmd': 'hand', 'help': 'me', 'allow_all': True} + assert cmds["push"] == {"cmd": "hand", "help": "me", "allow_all": True} @pytest.mark.parametrize( - 'path_input, expected', + "path_input, expected", [ - (['/home/some/repo'], '/home/some/repo,some/repo,,\r\n'), # add one new - (['/home/some/repo1', '/repo2'], - {'/repo2,repo2,,\r\n', # add two new - '/home/some/repo1,repo1,,\r\n'}), # add two new - (['/home/some/repo1', '/nos/repo'], - '/home/some/repo1,repo1,,\r\n'), # add one old one new - ]) -@patch('os.makedirs') -@patch('gita.utils.is_git', return_value=True) + (["/home/some/repo"], "/home/some/repo,some/repo,,\r\n"), # add one new + ( + ["/home/some/repo1", "/repo2"], + {"/repo2,repo2,,\r\n", "/home/some/repo1,repo1,,\r\n"}, # add two new + ), # add two new + ( + ["/home/some/repo1", "/nos/repo"], + "/home/some/repo1,repo1,,\r\n", + ), # add one old one new + ], +) +@patch("os.makedirs") +@patch("gita.utils.is_git", return_value=True) def test_add_repos(_0, _1, path_input, expected, monkeypatch): - monkeypatch.setenv('XDG_CONFIG_HOME', '/config') - with patch('builtins.open', mock_open()) as mock_file: - utils.add_repos({'repo': {'path': '/nos/repo'}}, path_input) - mock_file.assert_called_with('/config/gita/repos.csv', 'a+', newline='') + monkeypatch.setenv("XDG_CONFIG_HOME", "/config") + with patch("builtins.open", mock_open()) as mock_file: + utils.add_repos({"repo": {"path": "/nos/repo"}}, path_input) + mock_file.assert_called_with("/config/gita/repos.csv", "a+", newline="") handle = mock_file() if type(expected) == str: handle.write.assert_called_once_with(expected) @@ -145,21 +248,22 @@ def test_add_repos(_0, _1, path_input, expected, monkeypatch): assert not kwargs -@patch('gita.utils.write_to_groups_file') -@patch('gita.utils.write_to_repo_file') +@patch("gita.utils.write_to_groups_file") +@patch("gita.utils.write_to_repo_file") def test_rename_repo(mock_write, _): - repos = {'r1': {'path': '/a/b', 'type': None}, - 'r2': {'path': '/c/c', 'type': None}} - utils.rename_repo(repos, 'r2', 'xxx') - mock_write.assert_called_once_with(repos, 'w') + repos = {"r1": {"path": "/a/b", "type": None}, "r2": {"path": "/c/c", "type": None}} + utils.rename_repo(repos, "r2", "xxx") + mock_write.assert_called_once_with(repos, "w") def test_async_output(capfd): tasks = [ - utils.run_async('myrepo', '.', [ - 'python3', '-c', - f"print({i});import time; time.sleep({i});print({i})" - ]) for i in range(4) + utils.run_async( + "myrepo", + ".", + ["python3", "-c", f"print({i});import time; time.sleep({i});print({i})"], + ) + for i in range(4) ] # I don't fully understand why a new loop is needed here. Without a new # loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest @@ -168,12 +272,15 @@ def test_async_output(capfd): utils.exec_async_tasks(tasks) out, err = capfd.readouterr() - assert err == '' - assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n' + assert err == "" + assert ( + out + == "myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n" + ) def test_is_git(tmpdir): with tmpdir.as_cwd(): - subprocess.run('git init --bare .'.split()) + subprocess.run("git init --bare .".split()) assert utils.is_git(Path.cwd()) is False assert utils.is_git(Path.cwd(), include_bare=True) is True |