import os import stat import tempfile from time import time from unittest.mock import patch import pytest from pymysql import ProgrammingError import mycli.packages.special from .utils import dbtest, db_connection, send_ctrl_c def test_set_get_pager(): mycli.packages.special.set_pager_enabled(True) assert mycli.packages.special.is_pager_enabled() mycli.packages.special.set_pager_enabled(False) assert not mycli.packages.special.is_pager_enabled() mycli.packages.special.set_pager('less') assert os.environ['PAGER'] == "less" mycli.packages.special.set_pager(False) assert os.environ['PAGER'] == "less" del os.environ['PAGER'] mycli.packages.special.set_pager(False) mycli.packages.special.disable_pager() assert not mycli.packages.special.is_pager_enabled() def test_set_get_timing(): mycli.packages.special.set_timing_enabled(True) assert mycli.packages.special.is_timing_enabled() mycli.packages.special.set_timing_enabled(False) assert not mycli.packages.special.is_timing_enabled() def test_set_get_expanded_output(): mycli.packages.special.set_expanded_output(True) assert mycli.packages.special.is_expanded_output() mycli.packages.special.set_expanded_output(False) assert not mycli.packages.special.is_expanded_output() def test_editor_command(): assert mycli.packages.special.editor_command(r'hello\e') assert mycli.packages.special.editor_command(r'\ehello') assert not mycli.packages.special.editor_command(r'hello') assert mycli.packages.special.get_filename(r'\e filename') == "filename" os.environ['EDITOR'] = 'true' os.environ['VISUAL'] = 'true' # Set the editor to Notepad on Windows if os.name != 'nt': mycli.packages.special.open_external_editor(sql=r'select 1') == "select 1" else: pytest.skip('Skipping on Windows platform.') def test_tee_command(): mycli.packages.special.write_tee(u"hello world") # write without file set # keep Windows from locking the file with delete=False with tempfile.NamedTemporaryFile(delete=False) as f: mycli.packages.special.execute(None, u"tee " + f.name) mycli.packages.special.write_tee(u"hello world") if os.name=='nt': assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" mycli.packages.special.execute(None, u"tee -o " + f.name) mycli.packages.special.write_tee(u"hello world") f.seek(0) if os.name=='nt': assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" mycli.packages.special.execute(None, u"notee") mycli.packages.special.write_tee(u"hello world") f.seek(0) if os.name=='nt': assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" # remove temp file # delete=False means we should try to clean up try: if os.path.exists(f.name): os.remove(f.name) except Exception as e: print(f"An error occurred while attempting to delete the file: {e}") def test_tee_command_error(): with pytest.raises(TypeError): mycli.packages.special.execute(None, 'tee') with pytest.raises(OSError): with tempfile.NamedTemporaryFile() as f: os.chmod(f.name, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) mycli.packages.special.execute(None, 'tee {}'.format(f.name)) @dbtest @pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right") def test_favorite_query(): with db_connection().cursor() as cur: query = u'select "✔"' mycli.packages.special.execute(cur, u'\\fs check {0}'.format(query)) assert next(mycli.packages.special.execute( cur, u'\\f check'))[0] == "> " + query def test_once_command(): with pytest.raises(TypeError): mycli.packages.special.execute(None, u"\\once") with pytest.raises(OSError): mycli.packages.special.execute(None, u"\\once /proc/access-denied") mycli.packages.special.write_once(u"hello world") # write without file set # keep Windows from locking the file with delete=False with tempfile.NamedTemporaryFile(delete=False) as f: mycli.packages.special.execute(None, u"\\once " + f.name) mycli.packages.special.write_once(u"hello world") if os.name=='nt': assert f.read() == b"hello world\r\n" else: assert f.read() == b"hello world\n" mycli.packages.special.execute(None, u"\\once -o " + f.name) mycli.packages.special.write_once(u"hello world line 1") mycli.packages.special.write_once(u"hello world line 2") f.seek(0) if os.name=='nt': assert f.read() == b"hello world line 1\r\nhello world line 2\r\n" else: assert f.read() == b"hello world line 1\nhello world line 2\n" # delete=False means we should try to clean up try: if os.path.exists(f.name): os.remove(f.name) except Exception as e: print(f"An error occurred while attempting to delete the file: {e}") def test_pipe_once_command(): with pytest.raises(IOError): mycli.packages.special.execute(None, u"\\pipe_once") with pytest.raises(OSError): mycli.packages.special.execute( None, u"\\pipe_once /proc/access-denied") if os.name == 'nt': mycli.packages.special.execute(None, u'\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"') mycli.packages.special.write_once(u"hello world") mycli.packages.special.unset_pipe_once_if_written() else: mycli.packages.special.execute(None, u"\\pipe_once wc") mycli.packages.special.write_once(u"hello world") mycli.packages.special.unset_pipe_once_if_written() # how to assert on wc output? def test_parseargfile(): """Test that parseargfile expands the user directory.""" expected = {'file': os.path.join(os.path.expanduser('~'), 'filename'), 'mode': 'a'} if os.name=='nt': assert expected == mycli.packages.special.iocommands.parseargfile( '~\\filename') else: assert expected == mycli.packages.special.iocommands.parseargfile( '~/filename') expected = {'file': os.path.join(os.path.expanduser('~'), 'filename'), 'mode': 'w'} if os.name=='nt': assert expected == mycli.packages.special.iocommands.parseargfile( '-o ~\\filename') else: assert expected == mycli.packages.special.iocommands.parseargfile( '-o ~/filename') def test_parseargfile_no_file(): """Test that parseargfile raises a TypeError if there is no filename.""" with pytest.raises(TypeError): mycli.packages.special.iocommands.parseargfile('') with pytest.raises(TypeError): mycli.packages.special.iocommands.parseargfile('-o ') @dbtest def test_watch_query_iteration(): """Test that a single iteration of the result of `watch_query` executes the desired query and returns the given results.""" expected_value = "1" query = "SELECT {0!s}".format(expected_value) expected_title = '> {0!s}'.format(query) with db_connection().cursor() as cur: result = next(mycli.packages.special.iocommands.watch_query( arg=query, cur=cur )) assert result[0] == expected_title assert result[2][0] == expected_value @dbtest @pytest.mark.skipif(os.name == "nt", reason="Bug: Win handles this differently. May need to refactor watch_query to work for Win") def test_watch_query_full(): """Test that `watch_query`: * Returns the expected results. * Executes the defined times inside the given interval, in this case with a 0.3 seconds wait, it should execute 4 times inside a 1 seconds interval. * Stops at Ctrl-C """ watch_seconds = 0.3 wait_interval = 1 expected_value = "1" query = "SELECT {0!s}".format(expected_value) expected_title = '> {0!s}'.format(query) expected_results = 4 ctrl_c_process = send_ctrl_c(wait_interval) with db_connection().cursor() as cur: results = list( result for result in mycli.packages.special.iocommands.watch_query( arg='{0!s} {1!s}'.format(watch_seconds, query), cur=cur ) ) ctrl_c_process.join(1) assert len(results) == expected_results for result in results: assert result[0] == expected_title assert result[2][0] == expected_value @dbtest @patch('click.clear') def test_watch_query_clear(clear_mock): """Test that the screen is cleared with the -c flag of `watch` command before execute the query.""" with db_connection().cursor() as cur: watch_gen = mycli.packages.special.iocommands.watch_query( arg='0.1 -c select 1;', cur=cur ) assert not clear_mock.called next(watch_gen) assert clear_mock.called clear_mock.reset_mock() next(watch_gen) assert clear_mock.called clear_mock.reset_mock() @dbtest def test_watch_query_bad_arguments(): """Test different incorrect combinations of arguments for `watch` command.""" watch_query = mycli.packages.special.iocommands.watch_query with db_connection().cursor() as cur: with pytest.raises(ProgrammingError): next(watch_query('a select 1;', cur=cur)) with pytest.raises(ProgrammingError): next(watch_query('-a select 1;', cur=cur)) with pytest.raises(ProgrammingError): next(watch_query('1 -a select 1;', cur=cur)) with pytest.raises(ProgrammingError): next(watch_query('-c -a select 1;', cur=cur)) @dbtest @patch('click.clear') def test_watch_query_interval_clear(clear_mock): """Test `watch` command with interval and clear flag.""" def test_asserts(gen): clear_mock.reset_mock() start = time() next(gen) assert clear_mock.called next(gen) exec_time = time() - start assert exec_time > seconds and exec_time < (seconds + seconds) seconds = 1.0 watch_query = mycli.packages.special.iocommands.watch_query with db_connection().cursor() as cur: test_asserts(watch_query('{0!s} -c select 1;'.format(seconds), cur=cur)) test_asserts(watch_query('-c {0!s} select 1;'.format(seconds), cur=cur)) def test_split_sql_by_delimiter(): for delimiter_str in (';', '$', '😀'): mycli.packages.special.set_delimiter(delimiter_str) sql_input = "select 1{} select \ufffc2".format(delimiter_str) queries = ( "select 1", "select \ufffc2" ) for query, parsed_query in zip( queries, mycli.packages.special.split_queries(sql_input)): assert(query == parsed_query) def test_switch_delimiter_within_query(): mycli.packages.special.set_delimiter(';') sql_input = "select 1; delimiter $$ select 2 $$ select 3 $$" queries = ( "select 1", "delimiter $$ select 2 $$ select 3 $$", "select 2", "select 3" ) for query, parsed_query in zip( queries, mycli.packages.special.split_queries(sql_input)): assert(query == parsed_query) def test_set_delimiter(): for delim in ('foo', 'bar'): mycli.packages.special.set_delimiter(delim) assert mycli.packages.special.get_current_delimiter() == delim def teardown_function(): mycli.packages.special.set_delimiter(';')