summaryrefslogtreecommitdiffstats
path: root/test/test_tabular_output.py
blob: 45e97afd65c491b29b8ddc13251942f402950597 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Test the sql output adapter."""

from textwrap import dedent


from .utils import USER, PASSWORD, HOST, PORT, dbtest

import pytest
from mycli.main import MyCli

from pymysql.constants import FIELD_TYPE


@pytest.fixture
def mycli():
    cli = MyCli()
    cli.connect(None, USER, PASSWORD, HOST, PORT, None, init_command=None)
    return cli


@dbtest
def test_sql_output(mycli):
    """Test the sql output adapter."""
    headers = ["letters", "number", "optional", "float", "binary"]

    class FakeCursor(object):
        def __init__(self):
            self.data = [("abc", 1, None, 10.0, b"\xaa"), ("d", 456, "1", 0.5, b"\xaa\xbb")]
            self.description = [
                (None, FIELD_TYPE.VARCHAR),
                (None, FIELD_TYPE.LONG),
                (None, FIELD_TYPE.LONG),
                (None, FIELD_TYPE.FLOAT),
                (None, FIELD_TYPE.BLOB),
            ]

        def __iter__(self):
            return self

        def __next__(self):
            if self.data:
                return self.data.pop(0)
            else:
                raise StopIteration()

        def description(self):
            return self.description

    # Test sql-update output format
    assert list(mycli.change_table_format("sql-update")) == [(None, None, None, "Changed table format to sql-update")]
    mycli.formatter.query = ""
    output = mycli.format_output(None, FakeCursor(), headers)
    actual = "\n".join(output)
    assert actual == dedent("""\
            UPDATE `DUAL` SET
              `number` = 1
            , `optional` = NULL
            , `float` = 10.0e0
            , `binary` = X'aa'
            WHERE `letters` = 'abc';
            UPDATE `DUAL` SET
              `number` = 456
            , `optional` = '1'
            , `float` = 0.5e0
            , `binary` = X'aabb'
            WHERE `letters` = 'd';""")
    # Test sql-update-2 output format
    assert list(mycli.change_table_format("sql-update-2")) == [(None, None, None, "Changed table format to sql-update-2")]
    mycli.formatter.query = ""
    output = mycli.format_output(None, FakeCursor(), headers)
    assert "\n".join(output) == dedent("""\
            UPDATE `DUAL` SET
              `optional` = NULL
            , `float` = 10.0e0
            , `binary` = X'aa'
            WHERE `letters` = 'abc' AND `number` = 1;
            UPDATE `DUAL` SET
              `optional` = '1'
            , `float` = 0.5e0
            , `binary` = X'aabb'
            WHERE `letters` = 'd' AND `number` = 456;""")
    # Test sql-insert output format (without table name)
    assert list(mycli.change_table_format("sql-insert")) == [(None, None, None, "Changed table format to sql-insert")]
    mycli.formatter.query = ""
    output = mycli.format_output(None, FakeCursor(), headers)
    assert "\n".join(output) == dedent("""\
            INSERT INTO `DUAL` (`letters`, `number`, `optional`, `float`, `binary`) VALUES
              ('abc', 1, NULL, 10.0e0, X'aa')
            , ('d', 456, '1', 0.5e0, X'aabb')
            ;""")
    # Test sql-insert output format (with table name)
    assert list(mycli.change_table_format("sql-insert")) == [(None, None, None, "Changed table format to sql-insert")]
    mycli.formatter.query = "SELECT * FROM `table`"
    output = mycli.format_output(None, FakeCursor(), headers)
    assert "\n".join(output) == dedent("""\
            INSERT INTO table (`letters`, `number`, `optional`, `float`, `binary`) VALUES
              ('abc', 1, NULL, 10.0e0, X'aa')
            , ('d', 456, '1', 0.5e0, X'aabb')
            ;""")
    # Test sql-insert output format (with database + table name)
    assert list(mycli.change_table_format("sql-insert")) == [(None, None, None, "Changed table format to sql-insert")]
    mycli.formatter.query = "SELECT * FROM `database`.`table`"
    output = mycli.format_output(None, FakeCursor(), headers)
    assert "\n".join(output) == dedent("""\
            INSERT INTO database.table (`letters`, `number`, `optional`, `float`, `binary`) VALUES
              ('abc', 1, NULL, 10.0e0, X'aa')
            , ('d', 456, '1', 0.5e0, X'aabb')
            ;""")