summaryrefslogtreecommitdiffstats
path: root/tests/test_server_connection.py
blob: 1fda258b61981d23c78b4f712bf085ccb482a569 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import asyncio
import json
import os
from threading import Thread
from unittest.mock import Mock

import pytest

from pygls import IS_PYODIDE
from pygls.server import LanguageServer

try:
    import websockets

    WEBSOCKETS_AVAILABLE = True
except ImportError:
    WEBSOCKETS_AVAILABLE = False


@pytest.mark.asyncio
@pytest.mark.skipif(IS_PYODIDE, reason="threads are not available in pyodide.")
async def test_tcp_connection_lost():
    loop = asyncio.new_event_loop()

    server = LanguageServer("pygls-test", "v1", loop=loop)

    server.lsp.connection_made = Mock()
    server.lsp.connection_lost = Mock()

    # Run the server over TCP in a separate thread
    server_thread = Thread(
        target=server.start_tcp,
        args=(
            "127.0.0.1",
            0,
        ),
    )
    server_thread.daemon = True
    server_thread.start()

    # Wait for server to be ready
    while server._server is None:
        await asyncio.sleep(0.5)

    # Simulate client's connection
    port = server._server.sockets[0].getsockname()[1]
    reader, writer = await asyncio.open_connection("127.0.0.1", port)
    await asyncio.sleep(1)

    assert server.lsp.connection_made.called

    # Socket is closed (client's process is terminated)
    writer.close()
    await asyncio.sleep(1)

    assert server.lsp.connection_lost.called


@pytest.mark.asyncio
@pytest.mark.skipif(IS_PYODIDE, reason="threads are not available in pyodide.")
async def test_io_connection_lost():
    # Client to Server pipe.
    csr, csw = os.pipe()
    # Server to client pipe.
    scr, scw = os.pipe()

    server = LanguageServer("pygls-test", "v1", loop=asyncio.new_event_loop())
    server.lsp.connection_made = Mock()
    server_thread = Thread(
        target=server.start_io, args=(os.fdopen(csr, "rb"), os.fdopen(scw, "wb"))
    )
    server_thread.daemon = True
    server_thread.start()

    # Wait for server to be ready
    while not server.lsp.connection_made.called:
        await asyncio.sleep(0.5)

    # Pipe is closed (client's process is terminated)
    os.close(csw)
    server_thread.join()


@pytest.mark.asyncio
@pytest.mark.skipif(
    IS_PYODIDE or not WEBSOCKETS_AVAILABLE,
    reason="threads are not available in pyodide",
)
async def test_ws_server():
    """Smoke test to ensure we can send/receive messages over websockets"""

    loop = asyncio.new_event_loop()
    server = LanguageServer("pygls-test", "v1", loop=loop)

    # Run the server over Websockets in a separate thread
    server_thread = Thread(
        target=server.start_ws,
        args=(
            "127.0.0.1",
            0,
        ),
    )
    server_thread.daemon = True
    server_thread.start()

    # Wait for server to be ready
    while server._server is None:
        await asyncio.sleep(0.5)

    port = server._server.sockets[0].getsockname()[1]
    # Simulate client's connection
    async with websockets.connect(f"ws://127.0.0.1:{port}") as connection:
        # Send an 'initialize' request
        msg = dict(
            jsonrpc="2.0", id=1, method="initialize", params=dict(capabilities=dict())
        )
        await connection.send(json.dumps(msg))

        response = await connection.recv()
        assert "result" in response

        # Shut the server down
        msg = dict(
            jsonrpc="2.0", id=2, method="shutdown", params=dict(capabilities=dict())
        )
        await connection.send(json.dumps(msg))

        response = await connection.recv()
        assert "result" in response

        # Finally, tell it to exit
        msg = dict(jsonrpc="2.0", id=2, method="exit", params=None)
        await connection.send(json.dumps(msg))

    server_thread.join()