summaryrefslogtreecommitdiffstats
path: root/tests/test_utils.py
blob: 312804154e91d2126d06bb80ca94f12b22012c09 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import pytest
import asyncio
from unittest.mock import patch, mock_open

from gita import utils, info
from conftest import (
    PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME,
)


@pytest.mark.parametrize('test_input, diff_return, expected', [
    ({
        'abc': '/root/repo/'
    }, True, 'abc \x1b[31mrepo *+_  \x1b[0m msg'),
    ({
        'repo': '/root/repo2/'
    }, False, 'repo \x1b[32mrepo _    \x1b[0m msg'),
])
def test_describe(test_input, diff_return, expected, monkeypatch):
    monkeypatch.setattr(info, 'get_head', lambda x: 'repo')
    monkeypatch.setattr(info, 'run_quiet_diff', lambda _: diff_return)
    monkeypatch.setattr(info, 'get_commit_msg', lambda _: "msg")
    monkeypatch.setattr(info, 'has_untracked', lambda: True)
    monkeypatch.setattr('os.chdir', lambda x: None)
    print('expected: ', repr(expected))
    print('got:      ', repr(next(utils.describe(test_input))))
    assert expected == next(utils.describe(test_input))


@pytest.mark.parametrize('path_fname, expected', [
    (PATH_FNAME, {
        'repo1': '/a/bcd/repo1',
        'repo2': '/e/fgh/repo2',
        'xxx': '/a/b/c/repo3',
    }),
    (PATH_FNAME_EMPTY, {}),
    (PATH_FNAME_CLASH, {
        'repo1': '/a/bcd/repo1',
        'repo2': '/e/fgh/repo2',
        'x/repo1': '/root/x/repo1'
    }),
])
@patch('gita.utils.is_git', return_value=True)
@patch('gita.utils.get_config_fname')
def test_get_repos(mock_path_fname, _, path_fname, expected):
    mock_path_fname.return_value = path_fname
    utils.get_repos.cache_clear()
    assert utils.get_repos() == expected


@pytest.mark.parametrize('group_fname, expected', [
    (GROUP_FNAME, {'xx': ['a', 'b'], 'yy': ['a', 'c', 'd']}),
])
@patch('gita.utils.get_config_fname')
def test_get_groups(mock_group_fname, group_fname, expected):
    mock_group_fname.return_value = group_fname
    utils.get_groups.cache_clear()
    assert utils.get_groups() == expected


@patch('os.path.isfile', return_value=True)
@patch('os.path.getsize', return_value=True)
def test_custom_push_cmd(*_):
    with patch('builtins.open',
               mock_open(read_data='push:\n  cmd: hand\n  help: me')):
        cmds = utils.get_cmds_from_files()
    assert cmds['push'] == {'cmd': 'hand', 'help': 'me'}


@pytest.mark.parametrize(
    'path_input, expected',
    [
        (['/home/some/repo/'], '/home/some/repo,repo\n'),  # add one new
        (['/home/some/repo1', '/repo2'],
            {'/repo2,repo2\n/home/some/repo1,repo1\n',  # add two new
            '/home/some/repo1,repo1\n/repo2,repo2\n'}),  # add two new
        (['/home/some/repo1', '/nos/repo'],
         '/home/some/repo1,repo1\n'),  # add one old one new
    ])
@patch('os.makedirs')
@patch('gita.utils.is_git', return_value=True)
def test_add_repos(_0, _1, path_input, expected, monkeypatch):
    monkeypatch.setenv('XDG_CONFIG_HOME', '/config')
    with patch('builtins.open', mock_open()) as mock_file:
        utils.add_repos({'repo': '/nos/repo'}, path_input)
    mock_file.assert_called_with('/config/gita/repo_path', 'a+')
    handle = mock_file()
    if type(expected) == str:
        handle.write.assert_called_once_with(expected)
    else:
        handle.write.assert_called_once()
        args, kwargs = handle.write.call_args
        assert args[0] in expected
        assert not kwargs


@patch('gita.utils.write_to_repo_file')
def test_rename_repo(mock_write):
    utils.rename_repo({'r1': '/a/b', 'r2': '/c/c'}, 'r2', 'xxx')
    mock_write.assert_called_once_with({'r1': '/a/b', 'xxx': '/c/c'}, 'w')


def test_async_output(capfd):
    tasks = [
        utils.run_async('myrepo', '.', [
            'python3', '-c',
            f"print({i});import time; time.sleep({i});print({i})"
        ]) for i in range(4)
    ]
    # I don't fully understand why a new loop is needed here. Without a new
    # loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest
    # itself uses asyncio (or maybe pytest-xdist)?
    asyncio.set_event_loop(asyncio.new_event_loop())
    utils.exec_async_tasks(tasks)

    out, err = capfd.readouterr()
    assert err == ''
    assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n'