1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
def test_auth(judge_command):
judge_command("auth 123", {"command": "auth", "password": "123"})
def test_auth_redis6(judge_command):
from iredis.commands import command2syntax
from iredis.redis_grammar import get_command_grammar
get_command_grammar.cache_clear()
old = command2syntax["AUTH"]
command2syntax["AUTH"] = "command_usernamex_password"
judge_command(
"auth default 123",
{"command": "auth", "password": "123", "username": "default"},
)
judge_command("AUTH 123", {"command": "AUTH", "password": "123"})
command2syntax["AUTH"] = old
def test_echo(judge_command):
judge_command("echo hello", {"command": "echo", "message": "hello"})
def test_ping(judge_command):
judge_command("ping hello", {"command": "ping", "message": "hello"})
judge_command("ping", {"command": "ping", "message": None})
judge_command("ping hello world", None)
def test_select(judge_command):
for index in range(16):
judge_command(f"select {index}", {"command": "select", "index": str(index)})
for index in range(16, 100):
judge_command(f"select {index}", None)
judge_command("select acb", None)
def test_swapdb(judge_command):
for index1 in range(16):
for index2 in range(16):
judge_command(
f"swapdb {index1} {index2}",
{"command": "swapdb", "index": [str(index1), str(index2)]},
)
judge_command("swapdb abc 1", None)
judge_command("swapdb 1", None)
def test_client_caching(judge_command):
judge_command("CLIENT CACHING YES", {"command": "CLIENT CACHING", "yes": "YES"})
judge_command("CLIENT CACHING NO", {"command": "CLIENT CACHING", "yes": "NO"})
judge_command("CLIENT CACHING", None)
judge_command("CLIENT CACHING abc", None)
def test_client_tracking(judge_command):
judge_command("CLIENT TRACKING on", {"command": "CLIENT TRACKING", "on_off": "on"})
judge_command(
"CLIENT TRACKING ON REDIRECT 123",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"redirect_const": "REDIRECT",
"clientid": "123",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefixes": "foo",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefixes": "foo",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo BCAST NOLOOP OPTIN",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefixes": "foo",
"bcast_const": "BCAST",
"noloop_const": "NOLOOP",
"optin_const": "OPTIN",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo bar ok BCAST NOLOOP OPTIN",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefixes": "foo bar ok",
"bcast_const": "BCAST",
"noloop_const": "NOLOOP",
"optin_const": "OPTIN",
},
)
def test_client_pause(judge_command):
judge_command(
"CLIENT PAUSE 20 WRITE",
{"command": "CLIENT PAUSE", "timeout": "20", "pause_type": "WRITE"},
)
|