diff options
Diffstat (limited to 'test/test_special_iocommands.py')
-rw-r--r-- | test/test_special_iocommands.py | 198 |
1 files changed, 85 insertions, 113 deletions
diff --git a/test/test_special_iocommands.py b/test/test_special_iocommands.py index d0ca45f..bea5620 100644 --- a/test/test_special_iocommands.py +++ b/test/test_special_iocommands.py @@ -17,11 +17,11 @@ def test_set_get_pager(): assert mycli.packages.special.is_pager_enabled() mycli.packages.special.set_pager_enabled(False) assert not mycli.packages.special.is_pager_enabled() - mycli.packages.special.set_pager('less') - assert os.environ['PAGER'] == "less" + mycli.packages.special.set_pager("less") + assert os.environ["PAGER"] == "less" mycli.packages.special.set_pager(False) - assert os.environ['PAGER'] == "less" - del os.environ['PAGER'] + assert os.environ["PAGER"] == "less" + del os.environ["PAGER"] mycli.packages.special.set_pager(False) mycli.packages.special.disable_pager() assert not mycli.packages.special.is_pager_enabled() @@ -42,45 +42,44 @@ def test_set_get_expanded_output(): def test_editor_command(): - assert mycli.packages.special.editor_command(r'hello\e') - assert mycli.packages.special.editor_command(r'\ehello') - assert not mycli.packages.special.editor_command(r'hello') + assert mycli.packages.special.editor_command(r"hello\e") + assert mycli.packages.special.editor_command(r"\ehello") + assert not mycli.packages.special.editor_command(r"hello") - assert mycli.packages.special.get_filename(r'\e filename') == "filename" + assert mycli.packages.special.get_filename(r"\e filename") == "filename" - os.environ['EDITOR'] = 'true' - os.environ['VISUAL'] = 'true' + os.environ["EDITOR"] = "true" + os.environ["VISUAL"] = "true" # Set the editor to Notepad on Windows - if os.name != 'nt': - mycli.packages.special.open_external_editor(sql=r'select 1') == "select 1" + if os.name != "nt": + mycli.packages.special.open_external_editor(sql=r"select 1") == "select 1" else: - pytest.skip('Skipping on Windows platform.') - + pytest.skip("Skipping on Windows platform.") def test_tee_command(): - mycli.packages.special.write_tee(u"hello world") # write without file set + mycli.packages.special.write_tee("hello world") # write without file set # keep Windows from locking the file with delete=False with tempfile.NamedTemporaryFile(delete=False) as f: - mycli.packages.special.execute(None, u"tee " + f.name) - mycli.packages.special.write_tee(u"hello world") - if os.name=='nt': + mycli.packages.special.execute(None, "tee " + f.name) + mycli.packages.special.write_tee("hello world") + if os.name == "nt": assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" - mycli.packages.special.execute(None, u"tee -o " + f.name) - mycli.packages.special.write_tee(u"hello world") + mycli.packages.special.execute(None, "tee -o " + f.name) + mycli.packages.special.write_tee("hello world") f.seek(0) - if os.name=='nt': + if os.name == "nt": assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" - mycli.packages.special.execute(None, u"notee") - mycli.packages.special.write_tee(u"hello world") + mycli.packages.special.execute(None, "notee") + mycli.packages.special.write_tee("hello world") f.seek(0) - if os.name=='nt': + if os.name == "nt": assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" @@ -92,52 +91,49 @@ def test_tee_command(): os.remove(f.name) except Exception as e: print(f"An error occurred while attempting to delete the file: {e}") - def test_tee_command_error(): with pytest.raises(TypeError): - mycli.packages.special.execute(None, 'tee') + mycli.packages.special.execute(None, "tee") with pytest.raises(OSError): with tempfile.NamedTemporaryFile() as f: os.chmod(f.name, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - mycli.packages.special.execute(None, 'tee {}'.format(f.name)) + mycli.packages.special.execute(None, "tee {}".format(f.name)) @dbtest - @pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right") def test_favorite_query(): with db_connection().cursor() as cur: - query = u'select "✔"' - mycli.packages.special.execute(cur, u'\\fs check {0}'.format(query)) - assert next(mycli.packages.special.execute( - cur, u'\\f check'))[0] == "> " + query + query = 'select "✔"' + mycli.packages.special.execute(cur, "\\fs check {0}".format(query)) + assert next(mycli.packages.special.execute(cur, "\\f check"))[0] == "> " + query def test_once_command(): with pytest.raises(TypeError): - mycli.packages.special.execute(None, u"\\once") + mycli.packages.special.execute(None, "\\once") with pytest.raises(OSError): - mycli.packages.special.execute(None, u"\\once /proc/access-denied") + mycli.packages.special.execute(None, "\\once /proc/access-denied") - mycli.packages.special.write_once(u"hello world") # write without file set + mycli.packages.special.write_once("hello world") # write without file set # keep Windows from locking the file with delete=False with tempfile.NamedTemporaryFile(delete=False) as f: - mycli.packages.special.execute(None, u"\\once " + f.name) - mycli.packages.special.write_once(u"hello world") - if os.name=='nt': + mycli.packages.special.execute(None, "\\once " + f.name) + mycli.packages.special.write_once("hello world") + if os.name == "nt": assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" - mycli.packages.special.execute(None, u"\\once -o " + f.name) - mycli.packages.special.write_once(u"hello world line 1") - mycli.packages.special.write_once(u"hello world line 2") + mycli.packages.special.execute(None, "\\once -o " + f.name) + mycli.packages.special.write_once("hello world line 1") + mycli.packages.special.write_once("hello world line 2") f.seek(0) - if os.name=='nt': + if os.name == "nt": assert f.read() == b"hello world line 1\r\nhello world line 2\r\n" else: assert f.read() == b"hello world line 1\nhello world line 2\n" @@ -151,52 +147,47 @@ def test_once_command(): def test_pipe_once_command(): with pytest.raises(IOError): - mycli.packages.special.execute(None, u"\\pipe_once") + mycli.packages.special.execute(None, "\\pipe_once") with pytest.raises(OSError): - mycli.packages.special.execute( - None, u"\\pipe_once /proc/access-denied") + mycli.packages.special.execute(None, "\\pipe_once /proc/access-denied") - if os.name == 'nt': - mycli.packages.special.execute(None, u'\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"') - mycli.packages.special.write_once(u"hello world") + if os.name == "nt": + mycli.packages.special.execute(None, '\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"') + mycli.packages.special.write_once("hello world") mycli.packages.special.unset_pipe_once_if_written() else: - mycli.packages.special.execute(None, u"\\pipe_once wc") - mycli.packages.special.write_once(u"hello world") - mycli.packages.special.unset_pipe_once_if_written() - # how to assert on wc output? + with tempfile.NamedTemporaryFile() as f: + mycli.packages.special.execute(None, "\\pipe_once tee " + f.name) + mycli.packages.special.write_pipe_once("hello world") + mycli.packages.special.unset_pipe_once_if_written() + f.seek(0) + assert f.read() == b"hello world\n" def test_parseargfile(): """Test that parseargfile expands the user directory.""" - expected = {'file': os.path.join(os.path.expanduser('~'), 'filename'), - 'mode': 'a'} - - if os.name=='nt': - assert expected == mycli.packages.special.iocommands.parseargfile( - '~\\filename') + expected = {"file": os.path.join(os.path.expanduser("~"), "filename"), "mode": "a"} + + if os.name == "nt": + assert expected == mycli.packages.special.iocommands.parseargfile("~\\filename") else: - assert expected == mycli.packages.special.iocommands.parseargfile( - '~/filename') - - expected = {'file': os.path.join(os.path.expanduser('~'), 'filename'), - 'mode': 'w'} - if os.name=='nt': - assert expected == mycli.packages.special.iocommands.parseargfile( - '-o ~\\filename') + assert expected == mycli.packages.special.iocommands.parseargfile("~/filename") + + expected = {"file": os.path.join(os.path.expanduser("~"), "filename"), "mode": "w"} + if os.name == "nt": + assert expected == mycli.packages.special.iocommands.parseargfile("-o ~\\filename") else: - assert expected == mycli.packages.special.iocommands.parseargfile( - '-o ~/filename') + assert expected == mycli.packages.special.iocommands.parseargfile("-o ~/filename") def test_parseargfile_no_file(): """Test that parseargfile raises a TypeError if there is no filename.""" with pytest.raises(TypeError): - mycli.packages.special.iocommands.parseargfile('') + mycli.packages.special.iocommands.parseargfile("") with pytest.raises(TypeError): - mycli.packages.special.iocommands.parseargfile('-o ') + mycli.packages.special.iocommands.parseargfile("-o ") @dbtest @@ -205,11 +196,9 @@ def test_watch_query_iteration(): the desired query and returns the given results.""" expected_value = "1" query = "SELECT {0!s}".format(expected_value) - expected_title = '> {0!s}'.format(query) + expected_title = "> {0!s}".format(query) with db_connection().cursor() as cur: - result = next(mycli.packages.special.iocommands.watch_query( - arg=query, cur=cur - )) + result = next(mycli.packages.special.iocommands.watch_query(arg=query, cur=cur)) assert result[0] == expected_title assert result[2][0] == expected_value @@ -230,14 +219,12 @@ def test_watch_query_full(): wait_interval = 1 expected_value = "1" query = "SELECT {0!s}".format(expected_value) - expected_title = '> {0!s}'.format(query) + expected_title = "> {0!s}".format(query) expected_results = 4 ctrl_c_process = send_ctrl_c(wait_interval) with db_connection().cursor() as cur: results = list( - result for result in mycli.packages.special.iocommands.watch_query( - arg='{0!s} {1!s}'.format(watch_seconds, query), cur=cur - ) + result for result in mycli.packages.special.iocommands.watch_query(arg="{0!s} {1!s}".format(watch_seconds, query), cur=cur) ) ctrl_c_process.join(1) assert len(results) == expected_results @@ -247,14 +234,12 @@ def test_watch_query_full(): @dbtest -@patch('click.clear') +@patch("click.clear") def test_watch_query_clear(clear_mock): """Test that the screen is cleared with the -c flag of `watch` command before execute the query.""" with db_connection().cursor() as cur: - watch_gen = mycli.packages.special.iocommands.watch_query( - arg='0.1 -c select 1;', cur=cur - ) + watch_gen = mycli.packages.special.iocommands.watch_query(arg="0.1 -c select 1;", cur=cur) assert not clear_mock.called next(watch_gen) assert clear_mock.called @@ -271,19 +256,20 @@ def test_watch_query_bad_arguments(): watch_query = mycli.packages.special.iocommands.watch_query with db_connection().cursor() as cur: with pytest.raises(ProgrammingError): - next(watch_query('a select 1;', cur=cur)) + next(watch_query("a select 1;", cur=cur)) with pytest.raises(ProgrammingError): - next(watch_query('-a select 1;', cur=cur)) + next(watch_query("-a select 1;", cur=cur)) with pytest.raises(ProgrammingError): - next(watch_query('1 -a select 1;', cur=cur)) + next(watch_query("1 -a select 1;", cur=cur)) with pytest.raises(ProgrammingError): - next(watch_query('-c -a select 1;', cur=cur)) + next(watch_query("-c -a select 1;", cur=cur)) @dbtest -@patch('click.clear') +@patch("click.clear") def test_watch_query_interval_clear(clear_mock): """Test `watch` command with interval and clear flag.""" + def test_asserts(gen): clear_mock.reset_mock() start = time() @@ -296,46 +282,32 @@ def test_watch_query_interval_clear(clear_mock): seconds = 1.0 watch_query = mycli.packages.special.iocommands.watch_query with db_connection().cursor() as cur: - test_asserts(watch_query('{0!s} -c select 1;'.format(seconds), - cur=cur)) - test_asserts(watch_query('-c {0!s} select 1;'.format(seconds), - cur=cur)) + test_asserts(watch_query("{0!s} -c select 1;".format(seconds), cur=cur)) + test_asserts(watch_query("-c {0!s} select 1;".format(seconds), cur=cur)) def test_split_sql_by_delimiter(): - for delimiter_str in (';', '$', '😀'): + for delimiter_str in (";", "$", "😀"): mycli.packages.special.set_delimiter(delimiter_str) sql_input = "select 1{} select \ufffc2".format(delimiter_str) - queries = ( - "select 1", - "select \ufffc2" - ) - for query, parsed_query in zip( - queries, mycli.packages.special.split_queries(sql_input)): - assert(query == parsed_query) + queries = ("select 1", "select \ufffc2") + for query, parsed_query in zip(queries, mycli.packages.special.split_queries(sql_input)): + assert query == parsed_query def test_switch_delimiter_within_query(): - mycli.packages.special.set_delimiter(';') + mycli.packages.special.set_delimiter(";") sql_input = "select 1; delimiter $$ select 2 $$ select 3 $$" - queries = ( - "select 1", - "delimiter $$ select 2 $$ select 3 $$", - "select 2", - "select 3" - ) - for query, parsed_query in zip( - queries, - mycli.packages.special.split_queries(sql_input)): - assert(query == parsed_query) + queries = ("select 1", "delimiter $$ select 2 $$ select 3 $$", "select 2", "select 3") + for query, parsed_query in zip(queries, mycli.packages.special.split_queries(sql_input)): + assert query == parsed_query def test_set_delimiter(): - - for delim in ('foo', 'bar'): + for delim in ("foo", "bar"): mycli.packages.special.set_delimiter(delim) assert mycli.packages.special.get_current_delimiter() == delim def teardown_function(): - mycli.packages.special.set_delimiter(';') + mycli.packages.special.set_delimiter(";") |