summaryrefslogtreecommitdiffstats
path: root/tests/unittests/command_parse/test_connection.py
blob: 2a1de263ace4bff1a72f4e1e550a086ce514ee4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def test_auth(judge_command):
    judge_command("auth 123", {"command": "auth", "password": "123"})


def test_auth_redis6(judge_command):
    from iredis.commands import command2syntax
    from iredis.redis_grammar import get_command_grammar

    get_command_grammar.cache_clear()

    old = command2syntax["AUTH"]
    command2syntax["AUTH"] = "command_usernamex_password"
    judge_command(
        "auth default 123",
        {"command": "auth", "password": "123", "username": "default"},
    )
    judge_command("AUTH 123", {"command": "AUTH", "password": "123"})
    command2syntax["AUTH"] = old


def test_echo(judge_command):
    judge_command("echo hello", {"command": "echo", "message": "hello"})


def test_ping(judge_command):
    judge_command("ping hello", {"command": "ping", "message": "hello"})
    judge_command("ping", {"command": "ping", "message": None})
    judge_command("ping hello world", None)


def test_select(judge_command):
    for index in range(16):
        judge_command(f"select {index}", {"command": "select", "index": str(index)})
    for index in range(16, 100):
        judge_command(f"select {index}", None)
    judge_command("select acb", None)


def test_swapdb(judge_command):
    for index1 in range(16):
        for index2 in range(16):
            judge_command(
                f"swapdb {index1} {index2}",
                {"command": "swapdb", "index": [str(index1), str(index2)]},
            )
    judge_command("swapdb abc 1", None)
    judge_command("swapdb 1", None)


def test_client_caching(judge_command):
    judge_command("CLIENT CACHING YES", {"command": "CLIENT CACHING", "yes": "YES"})
    judge_command("CLIENT CACHING   NO", {"command": "CLIENT CACHING", "yes": "NO"})
    judge_command("CLIENT CACHING", None)
    judge_command("CLIENT CACHING abc", None)


def test_client_tracking(judge_command):
    judge_command("CLIENT TRACKING on", {"command": "CLIENT TRACKING", "on_off": "on"})
    judge_command(
        "CLIENT TRACKING ON REDIRECT 123",
        {
            "command": "CLIENT TRACKING",
            "on_off": "ON",
            "redirect_const": "REDIRECT",
            "clientid": "123",
        },
    )
    judge_command(
        "CLIENT TRACKING ON PREFIX foo",
        {
            "command": "CLIENT TRACKING",
            "on_off": "ON",
            "prefix_const": "PREFIX",
            "prefixes": "foo",
        },
    )
    judge_command(
        "CLIENT TRACKING ON PREFIX foo",
        {
            "command": "CLIENT TRACKING",
            "on_off": "ON",
            "prefix_const": "PREFIX",
            "prefixes": "foo",
        },
    )
    judge_command(
        "CLIENT TRACKING ON PREFIX foo BCAST NOLOOP OPTIN",
        {
            "command": "CLIENT TRACKING",
            "on_off": "ON",
            "prefix_const": "PREFIX",
            "prefixes": "foo",
            "bcast_const": "BCAST",
            "noloop_const": "NOLOOP",
            "optin_const": "OPTIN",
        },
    )
    judge_command(
        "CLIENT TRACKING ON PREFIX foo bar ok BCAST NOLOOP OPTIN",
        {
            "command": "CLIENT TRACKING",
            "on_off": "ON",
            "prefix_const": "PREFIX",
            "prefixes": "foo bar ok",
            "bcast_const": "BCAST",
            "noloop_const": "NOLOOP",
            "optin_const": "OPTIN",
        },
    )


def test_client_pause(judge_command):
    judge_command(
        "CLIENT PAUSE 20 WRITE",
        {"command": "CLIENT PAUSE", "timeout": "20", "pause_type": "WRITE"},
    )