1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
import pexpect
import pytest
from textwrap import dedent
def test_start_on_connection_error():
cli = pexpect.spawn("iredis -p 12345", timeout=1)
cli.logfile_read = open("cli_test.log", "ab")
cli.expect(r"Error \d+ connecting to 127.0.0.1:12345. Connection refused.")
cli.close()
def test_start_with_client_name():
cli = pexpect.spawn("iredis --client_name custom_name", timeout=2)
cli.expect("iredis")
cli.sendline("CLIENT GETNAME")
cli.expect("custom_name")
cli.close()
def test_short_help_option(config):
c = pexpect.spawn("iredis -h", timeout=2)
c.expect("Show this message and exit.")
c = pexpect.spawn("iredis -h 127.0.0.1")
c.expect("127.0.0.1:6379>")
c.close()
@pytest.mark.skipif("int(os.environ['REDIS_VERSION']) != 5")
def test_server_version_in_starting_on5():
c = pexpect.spawn("iredis", timeout=2)
c.expect("redis-server 5")
c.close()
@pytest.mark.skipif("int(os.environ['REDIS_VERSION']) != 6")
def test_server_version_in_starting_on6():
c = pexpect.spawn("iredis", timeout=2)
c.expect("redis-server 6")
c.close()
def test_connection_using_url(clean_redis):
c = pexpect.spawn("iredis --url redis://localhost:6379/7", timeout=2)
c.logfile_read = open("cli_test.log", "ab")
c.expect(["iredis", "127.0.0.1:6379[7]>"])
c.sendline("set current-db 7")
c.expect("OK")
c.close()
def test_connection_using_url_from_env(clean_redis, monkeypatch):
monkeypatch.setenv("IREDIS_URL", "redis://localhost:6379/7")
c = pexpect.spawn("iredis", timeout=2)
c.logfile_read = open("cli_test.log", "ab")
c.expect(["iredis", "localhost:6379[7]>"])
c.sendline("set current-db 7")
c.expect("OK")
c.close()
@pytest.mark.xfail(reason="current test in github action, socket not supported.")
# https://github.community/t5/GitHub-Actions/Job-service-command/td-p/33901#
# https://help.github.com/en/actions/reference/workflow-syntax-for-github-actions#jobsjob_idservices
def test_connect_via_socket(fake_redis_socket):
config_content = dedent(
"""
[main]
log_location = /tmp/iredis1.log
no_info=True
"""
)
with open("/tmp/iredisrc", "w+") as etc_config:
etc_config.write(config_content)
c = pexpect.spawn("iredis --iredisrc /tmp/iredisrc -s /tmp/test.sock", timeout=2)
c.logfile_read = open("cli_test.log", "ab")
c.expect("redis /tmp/test.sock")
c.close()
def test_iredis_start_with_prompt():
cli = pexpect.spawn("iredis --prompt '{host}abc{port}def{client_name}'", timeout=2)
cli.logfile_read = open("cli_test.log", "ab")
cli.expect("iredis")
cli.expect("127.0.0.1abc6379defNone")
cli.close()
|