summaryrefslogtreecommitdiffstats
path: root/tests/test_main.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_main.py')
-rw-r--r--tests/test_main.py120
1 files changed, 104 insertions, 16 deletions
diff --git a/tests/test_main.py b/tests/test_main.py
index c48accb..9b3a84b 100644
--- a/tests/test_main.py
+++ b/tests/test_main.py
@@ -61,16 +61,47 @@ def test_format_output():
)
expected = [
"Title",
- "+---------+---------+",
- "| head1 | head2 |",
- "|---------+---------|",
- "| abc | def |",
- "+---------+---------+",
+ "+-------+-------+",
+ "| head1 | head2 |",
+ "|-------+-------|",
+ "| abc | def |",
+ "+-------+-------+",
"test status",
]
assert list(results) == expected
+def test_format_output_truncate_on():
+ settings = OutputSettings(
+ table_format="psql", dcmlfmt="d", floatfmt="g", max_field_width=10
+ )
+ results = format_output(
+ None,
+ [("first field value", "second field value")],
+ ["head1", "head2"],
+ None,
+ settings,
+ )
+ expected = [
+ "+------------+------------+",
+ "| head1 | head2 |",
+ "|------------+------------|",
+ "| first f... | second ... |",
+ "+------------+------------+",
+ ]
+ assert list(results) == expected
+
+
+def test_format_output_truncate_off():
+ settings = OutputSettings(
+ table_format="psql", dcmlfmt="d", floatfmt="g", max_field_width=None
+ )
+ long_field_value = ("first field " * 100).strip()
+ results = format_output(None, [(long_field_value,)], ["head1"], None, settings)
+ lines = list(results)
+ assert lines[3] == f"| {long_field_value} |"
+
+
@dbtest
def test_format_array_output(executor):
statement = """
@@ -83,12 +114,12 @@ def test_format_array_output(executor):
"""
results = run(executor, statement)
expected = [
- "+----------------+------------------------+--------------+",
- "| bigint_array | nested_numeric_array | 配列 |",
- "|----------------+------------------------+--------------|",
- "| {1,2,3} | {{1,2},{3,4}} | {å,魚,текст} |",
- "| {} | <null> | {<null>} |",
- "+----------------+------------------------+--------------+",
+ "+--------------+----------------------+--------------+",
+ "| bigint_array | nested_numeric_array | 配列 |",
+ "|--------------+----------------------+--------------|",
+ "| {1,2,3} | {{1,2},{3,4}} | {å,魚,текст} |",
+ "| {} | <null> | {<null>} |",
+ "+--------------+----------------------+--------------+",
"SELECT 2",
]
assert list(results) == expected
@@ -128,11 +159,11 @@ def test_format_output_auto_expand():
)
table = [
"Title",
- "+---------+---------+",
- "| head1 | head2 |",
- "|---------+---------|",
- "| abc | def |",
- "+---------+---------+",
+ "+-------+-------+",
+ "| head1 | head2 |",
+ "|-------+-------|",
+ "| abc | def |",
+ "+-------+-------+",
"test status",
]
assert list(table_results) == table
@@ -266,6 +297,63 @@ def test_i_works(tmpdir, executor):
run(executor, statement, pgspecial=cli.pgspecial)
+@dbtest
+def test_watch_works(executor):
+ cli = PGCli(pgexecute=executor)
+
+ def run_with_watch(
+ query, target_call_count=1, expected_output="", expected_timing=None
+ ):
+ """
+ :param query: Input to the CLI
+ :param target_call_count: Number of times the user lets the command run before Ctrl-C
+ :param expected_output: Substring expected to be found for each executed query
+ :param expected_timing: value `time.sleep` expected to be called with on every invocation
+ """
+ with mock.patch.object(cli, "echo_via_pager") as mock_echo, mock.patch(
+ "pgcli.main.sleep"
+ ) as mock_sleep:
+ mock_sleep.side_effect = [None] * (target_call_count - 1) + [
+ KeyboardInterrupt
+ ]
+ cli.handle_watch_command(query)
+ # Validate that sleep was called with the right timing
+ for i in range(target_call_count - 1):
+ assert mock_sleep.call_args_list[i][0][0] == expected_timing
+ # Validate that the output of the query was expected
+ assert mock_echo.call_count == target_call_count
+ for i in range(target_call_count):
+ assert expected_output in mock_echo.call_args_list[i][0][0]
+
+ # With no history, it errors.
+ with mock.patch("pgcli.main.click.secho") as mock_secho:
+ cli.handle_watch_command(r"\watch 2")
+ mock_secho.assert_called()
+ assert (
+ r"\watch cannot be used with an empty query"
+ in mock_secho.call_args_list[0][0][0]
+ )
+
+ # Usage 1: Run a query and then re-run it with \watch across two prompts.
+ run_with_watch("SELECT 111", expected_output="111")
+ run_with_watch(
+ "\\watch 10", target_call_count=2, expected_output="111", expected_timing=10
+ )
+
+ # Usage 2: Run a query and \watch via the same prompt.
+ run_with_watch(
+ "SELECT 222; \\watch 4",
+ target_call_count=3,
+ expected_output="222",
+ expected_timing=4,
+ )
+
+ # Usage 3: Re-run the last watched command with a new timing
+ run_with_watch(
+ "\\watch 5", target_call_count=4, expected_output="222", expected_timing=5
+ )
+
+
def test_missing_rc_dir(tmpdir):
rcfile = str(tmpdir.join("subdir").join("rcfile"))