summaryrefslogtreecommitdiffstats
path: root/tests/test_sqlexecute.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_sqlexecute.py')
-rw-r--r--tests/test_sqlexecute.py405
1 files changed, 405 insertions, 0 deletions
diff --git a/tests/test_sqlexecute.py b/tests/test_sqlexecute.py
new file mode 100644
index 0000000..16bad74
--- /dev/null
+++ b/tests/test_sqlexecute.py
@@ -0,0 +1,405 @@
+# coding=UTF-8
+
+import os
+
+import pytest
+
+from utils import run, dbtest, set_expanded_output, is_expanded_output
+from sqlite3 import OperationalError, ProgrammingError
+
+
+def assert_result_equal(
+ result,
+ title=None,
+ rows=None,
+ headers=None,
+ status=None,
+ auto_status=True,
+ assert_contains=False,
+):
+ """Assert that an sqlexecute.run() result matches the expected values."""
+ if status is None and auto_status and rows:
+ status = "{} row{} in set".format(len(rows), "s" if len(rows) > 1 else "")
+ fields = {"title": title, "rows": rows, "headers": headers, "status": status}
+
+ if assert_contains:
+ # Do a loose match on the results using the *in* operator.
+ for key, field in fields.items():
+ if field:
+ assert field in result[0][key]
+ else:
+ # Do an exact match on the fields.
+ assert result == [fields]
+
+
+@dbtest
+def test_conn(executor):
+ run(executor, """create table test(a text)""")
+ run(executor, """insert into test values('abc')""")
+ results = run(executor, """select * from test""")
+
+ assert_result_equal(results, headers=["a"], rows=[("abc",)])
+
+
+@dbtest
+def test_bools(executor):
+ run(executor, """create table test(a boolean)""")
+ run(executor, """insert into test values(1)""")
+ results = run(executor, """select * from test""")
+
+ assert_result_equal(results, headers=["a"], rows=[(1,)])
+
+
+@dbtest
+def test_binary(executor):
+ run(executor, """create table foo(blb BLOB NOT NULL)""")
+ run(executor, """INSERT INTO foo VALUES ('\x01\x01\x01\n')""")
+ results = run(executor, """select * from foo""")
+
+ expected = "\x01\x01\x01\n"
+
+ assert_result_equal(results, headers=["blb"], rows=[(expected,)])
+
+
+## Failing in Travis for some unknown reason.
+# @dbtest
+# def test_table_and_columns_query(executor):
+# run(executor, "create table a(x text, y text)")
+# run(executor, "create table b(z text)")
+
+# assert set(executor.tables()) == set([("a",), ("b",)])
+# assert set(executor.table_columns()) == set([("a", "x"), ("a", "y"), ("b", "z")])
+
+
+@dbtest
+def test_database_list(executor):
+ databases = executor.databases()
+ assert "main" in list(databases)
+
+
+@dbtest
+def test_invalid_syntax(executor):
+ with pytest.raises(OperationalError) as excinfo:
+ run(executor, "invalid syntax!")
+ assert "syntax error" in str(excinfo.value)
+
+
+@dbtest
+def test_invalid_column_name(executor):
+ with pytest.raises(OperationalError) as excinfo:
+ run(executor, "select invalid command")
+ assert "no such column: invalid" in str(excinfo.value)
+
+
+@dbtest
+def test_unicode_support_in_output(executor):
+ run(executor, "create table unicodechars(t text)")
+ run(executor, "insert into unicodechars (t) values ('é')")
+
+ # See issue #24, this raises an exception without proper handling
+ results = run(executor, "select * from unicodechars")
+ assert_result_equal(results, headers=["t"], rows=[("é",)])
+
+
+@dbtest
+def test_invalid_unicode_values_dont_choke(executor):
+ run(executor, "create table unicodechars(t text)")
+ # \xc3 is not a valid utf-8 char. But we can insert it into the database
+ # which can break querying if not handled correctly.
+ run(executor, "insert into unicodechars (t) values (cast(x'c3' as text))")
+
+ results = run(executor, "select * from unicodechars")
+ assert_result_equal(results, headers=["t"], rows=[("\\xc3",)])
+
+
+@dbtest
+def test_multiple_queries_same_line(executor):
+ results = run(executor, "select 'foo'; select 'bar'")
+
+ expected = [
+ {
+ "title": None,
+ "headers": ["'foo'"],
+ "rows": [("foo",)],
+ "status": "1 row in set",
+ },
+ {
+ "title": None,
+ "headers": ["'bar'"],
+ "rows": [("bar",)],
+ "status": "1 row in set",
+ },
+ ]
+ assert expected == results
+
+
+@dbtest
+def test_multiple_queries_same_line_syntaxerror(executor):
+ with pytest.raises(OperationalError) as excinfo:
+ run(executor, "select 'foo'; invalid syntax")
+ assert "syntax error" in str(excinfo.value)
+
+
+@dbtest
+def test_favorite_query(executor):
+ set_expanded_output(False)
+ run(executor, "create table test(a text)")
+ run(executor, "insert into test values('abc')")
+ run(executor, "insert into test values('def')")
+
+ results = run(executor, "\\fs test-a select * from test where a like 'a%'")
+ assert_result_equal(results, status="Saved.")
+
+ results = run(executor, "\\f+ test-a")
+ assert_result_equal(
+ results,
+ title="> select * from test where a like 'a%'",
+ headers=["a"],
+ rows=[("abc",)],
+ auto_status=False,
+ )
+
+ results = run(executor, "\\fd test-a")
+ assert_result_equal(results, status="test-a: Deleted")
+
+
+@dbtest
+def test_bind_parameterized_favorite_query(executor):
+ set_expanded_output(False)
+ run(executor, "create table test(name text, id integer)")
+ run(executor, "insert into test values('def', 2)")
+ run(executor, "insert into test values('two words', 3)")
+
+ results = run(executor, "\\fs q_param select * from test where name=?")
+ assert_result_equal(results, status="Saved.")
+
+ results = run(executor, "\\f+ q_param def")
+ assert_result_equal(
+ results,
+ title="> select * from test where name=?",
+ headers=["name", "id"],
+ rows=[("def", 2)],
+ auto_status=False,
+ )
+
+ results = run(executor, "\\f+ q_param 'two words'")
+ assert_result_equal(
+ results,
+ title="> select * from test where name=?",
+ headers=["name", "id"],
+ rows=[("two words", 3)],
+ auto_status=False,
+ )
+
+ with pytest.raises(ProgrammingError):
+ results = run(executor, "\\f+ q_param")
+
+ with pytest.raises(ProgrammingError):
+ results = run(executor, "\\f+ q_param 1 2")
+
+
+@dbtest
+def test_verbose_feature_of_favorite_query(executor):
+ set_expanded_output(False)
+ run(executor, "create table test(a text, id integer)")
+ run(executor, "insert into test values('abc', 1)")
+ run(executor, "insert into test values('def', 2)")
+
+ results = run(executor, "\\fs sh_param select * from test where id=$1")
+ assert_result_equal(results, status="Saved.")
+
+ results = run(executor, "\\f sh_param 1")
+ assert_result_equal(
+ results,
+ title=None,
+ headers=["a", "id"],
+ rows=[("abc", 1)],
+ auto_status=False,
+ )
+
+ results = run(executor, "\\f+ sh_param 1")
+ assert_result_equal(
+ results,
+ title="> select * from test where id=1",
+ headers=["a", "id"],
+ rows=[("abc", 1)],
+ auto_status=False,
+ )
+
+
+@dbtest
+def test_shell_parameterized_favorite_query(executor):
+ set_expanded_output(False)
+ run(executor, "create table test(a text, id integer)")
+ run(executor, "insert into test values('abc', 1)")
+ run(executor, "insert into test values('def', 2)")
+
+ results = run(executor, "\\fs sh_param select * from test where id=$1")
+ assert_result_equal(results, status="Saved.")
+
+ results = run(executor, "\\f+ sh_param 1")
+ assert_result_equal(
+ results,
+ title="> select * from test where id=1",
+ headers=["a", "id"],
+ rows=[("abc", 1)],
+ auto_status=False,
+ )
+
+ results = run(executor, "\\f+ sh_param")
+ assert_result_equal(
+ results,
+ title=None,
+ headers=None,
+ rows=None,
+ status="missing substitution for $1 in query:\n select * from test where id=$1",
+ )
+
+ results = run(executor, "\\f+ sh_param 1 2")
+ assert_result_equal(
+ results,
+ title=None,
+ headers=None,
+ rows=None,
+ status="Too many arguments.\nQuery does not have enough place holders to substitute.\nselect * from test where id=1",
+ )
+
+
+@dbtest
+def test_favorite_query_multiple_statement(executor):
+ set_expanded_output(False)
+ run(executor, "create table test(a text)")
+ run(executor, "insert into test values('abc')")
+ run(executor, "insert into test values('def')")
+
+ results = run(
+ executor,
+ "\\fs test-ad select * from test where a like 'a%'; "
+ "select * from test where a like 'd%'",
+ )
+ assert_result_equal(results, status="Saved.")
+
+ results = run(executor, "\\f+ test-ad")
+ expected = [
+ {
+ "title": "> select * from test where a like 'a%'",
+ "headers": ["a"],
+ "rows": [("abc",)],
+ "status": None,
+ },
+ {
+ "title": "> select * from test where a like 'd%'",
+ "headers": ["a"],
+ "rows": [("def",)],
+ "status": None,
+ },
+ ]
+ assert expected == results
+
+ results = run(executor, "\\fd test-ad")
+ assert_result_equal(results, status="test-ad: Deleted")
+
+
+@dbtest
+def test_favorite_query_expanded_output(executor):
+ set_expanded_output(False)
+ run(executor, """create table test(a text)""")
+ run(executor, """insert into test values('abc')""")
+
+ results = run(executor, "\\fs test-ae select * from test")
+ assert_result_equal(results, status="Saved.")
+
+ results = run(executor, "\\f+ test-ae \G")
+ assert is_expanded_output() is True
+ assert_result_equal(
+ results,
+ title="> select * from test",
+ headers=["a"],
+ rows=[("abc",)],
+ auto_status=False,
+ )
+
+ set_expanded_output(False)
+
+ results = run(executor, "\\fd test-ae")
+ assert_result_equal(results, status="test-ae: Deleted")
+
+
+@dbtest
+def test_special_command(executor):
+ results = run(executor, "\\?")
+ assert_result_equal(
+ results,
+ rows=("quit", "\\q", "Quit."),
+ headers="Command",
+ assert_contains=True,
+ auto_status=False,
+ )
+
+
+@dbtest
+def test_cd_command_without_a_folder_name(executor):
+ results = run(executor, "system cd")
+ assert_result_equal(results, status="No folder name was provided.")
+
+
+@dbtest
+def test_system_command_not_found(executor):
+ results = run(executor, "system xyz")
+ assert_result_equal(
+ results, status="OSError: No such file or directory", assert_contains=True
+ )
+
+
+@dbtest
+def test_system_command_output(executor):
+ test_dir = os.path.abspath(os.path.dirname(__file__))
+ test_file_path = os.path.join(test_dir, "test.txt")
+ results = run(executor, "system cat {0}".format(test_file_path))
+ assert_result_equal(results, status="litecli is awesome!\n")
+
+
+@dbtest
+def test_cd_command_current_dir(executor):
+ test_path = os.path.abspath(os.path.dirname(__file__))
+ run(executor, "system cd {0}".format(test_path))
+ assert os.getcwd() == test_path
+ run(executor, "system cd ..")
+
+
+@dbtest
+def test_unicode_support(executor):
+ results = run(executor, "SELECT '日本語' AS japanese;")
+ assert_result_equal(results, headers=["japanese"], rows=[("日本語",)])
+
+
+@dbtest
+def test_timestamp_null(executor):
+ run(executor, """create table ts_null(a timestamp null)""")
+ run(executor, """insert into ts_null values(null)""")
+ results = run(executor, """select * from ts_null""")
+ assert_result_equal(results, headers=["a"], rows=[(None,)])
+
+
+@dbtest
+def test_datetime_null(executor):
+ run(executor, """create table dt_null(a datetime null)""")
+ run(executor, """insert into dt_null values(null)""")
+ results = run(executor, """select * from dt_null""")
+ assert_result_equal(results, headers=["a"], rows=[(None,)])
+
+
+@dbtest
+def test_date_null(executor):
+ run(executor, """create table date_null(a date null)""")
+ run(executor, """insert into date_null values(null)""")
+ results = run(executor, """select * from date_null""")
+ assert_result_equal(results, headers=["a"], rows=[(None,)])
+
+
+@dbtest
+def test_time_null(executor):
+ run(executor, """create table time_null(a time null)""")
+ run(executor, """insert into time_null values(null)""")
+ results = run(executor, """select * from time_null""")
+ assert_result_equal(results, headers=["a"], rows=[(None,)])