diff options
Diffstat (limited to 'tests/test_utils.py')
-rw-r--r-- | tests/test_utils.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..3128041 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,118 @@ +import pytest +import asyncio +from unittest.mock import patch, mock_open + +from gita import utils, info +from conftest import ( + PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME, +) + + +@pytest.mark.parametrize('test_input, diff_return, expected', [ + ({ + 'abc': '/root/repo/' + }, True, 'abc \x1b[31mrepo *+_ \x1b[0m msg'), + ({ + 'repo': '/root/repo2/' + }, False, 'repo \x1b[32mrepo _ \x1b[0m msg'), +]) +def test_describe(test_input, diff_return, expected, monkeypatch): + monkeypatch.setattr(info, 'get_head', lambda x: 'repo') + monkeypatch.setattr(info, 'run_quiet_diff', lambda _: diff_return) + monkeypatch.setattr(info, 'get_commit_msg', lambda _: "msg") + monkeypatch.setattr(info, 'has_untracked', lambda: True) + monkeypatch.setattr('os.chdir', lambda x: None) + print('expected: ', repr(expected)) + print('got: ', repr(next(utils.describe(test_input)))) + assert expected == next(utils.describe(test_input)) + + +@pytest.mark.parametrize('path_fname, expected', [ + (PATH_FNAME, { + 'repo1': '/a/bcd/repo1', + 'repo2': '/e/fgh/repo2', + 'xxx': '/a/b/c/repo3', + }), + (PATH_FNAME_EMPTY, {}), + (PATH_FNAME_CLASH, { + 'repo1': '/a/bcd/repo1', + 'repo2': '/e/fgh/repo2', + 'x/repo1': '/root/x/repo1' + }), +]) +@patch('gita.utils.is_git', return_value=True) +@patch('gita.utils.get_config_fname') +def test_get_repos(mock_path_fname, _, path_fname, expected): + mock_path_fname.return_value = path_fname + utils.get_repos.cache_clear() + assert utils.get_repos() == expected + + +@pytest.mark.parametrize('group_fname, expected', [ + (GROUP_FNAME, {'xx': ['a', 'b'], 'yy': ['a', 'c', 'd']}), +]) +@patch('gita.utils.get_config_fname') +def test_get_groups(mock_group_fname, group_fname, expected): + mock_group_fname.return_value = group_fname + utils.get_groups.cache_clear() + assert utils.get_groups() == expected + + +@patch('os.path.isfile', return_value=True) +@patch('os.path.getsize', return_value=True) +def test_custom_push_cmd(*_): + with patch('builtins.open', + mock_open(read_data='push:\n cmd: hand\n help: me')): + cmds = utils.get_cmds_from_files() + assert cmds['push'] == {'cmd': 'hand', 'help': 'me'} + + +@pytest.mark.parametrize( + 'path_input, expected', + [ + (['/home/some/repo/'], '/home/some/repo,repo\n'), # add one new + (['/home/some/repo1', '/repo2'], + {'/repo2,repo2\n/home/some/repo1,repo1\n', # add two new + '/home/some/repo1,repo1\n/repo2,repo2\n'}), # add two new + (['/home/some/repo1', '/nos/repo'], + '/home/some/repo1,repo1\n'), # add one old one new + ]) +@patch('os.makedirs') +@patch('gita.utils.is_git', return_value=True) +def test_add_repos(_0, _1, path_input, expected, monkeypatch): + monkeypatch.setenv('XDG_CONFIG_HOME', '/config') + with patch('builtins.open', mock_open()) as mock_file: + utils.add_repos({'repo': '/nos/repo'}, path_input) + mock_file.assert_called_with('/config/gita/repo_path', 'a+') + handle = mock_file() + if type(expected) == str: + handle.write.assert_called_once_with(expected) + else: + handle.write.assert_called_once() + args, kwargs = handle.write.call_args + assert args[0] in expected + assert not kwargs + + +@patch('gita.utils.write_to_repo_file') +def test_rename_repo(mock_write): + utils.rename_repo({'r1': '/a/b', 'r2': '/c/c'}, 'r2', 'xxx') + mock_write.assert_called_once_with({'r1': '/a/b', 'xxx': '/c/c'}, 'w') + + +def test_async_output(capfd): + tasks = [ + utils.run_async('myrepo', '.', [ + 'python3', '-c', + f"print({i});import time; time.sleep({i});print({i})" + ]) for i in range(4) + ] + # I don't fully understand why a new loop is needed here. Without a new + # loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest + # itself uses asyncio (or maybe pytest-xdist)? + asyncio.set_event_loop(asyncio.new_event_loop()) + utils.exec_async_tasks(tasks) + + out, err = capfd.readouterr() + assert err == '' + assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n' |