summaryrefslogtreecommitdiffstats
path: root/test/modules/http2/test_800_websockets.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/modules/http2/test_800_websockets.py')
-rw-r--r--test/modules/http2/test_800_websockets.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/test/modules/http2/test_800_websockets.py b/test/modules/http2/test_800_websockets.py
new file mode 100644
index 0000000..5b46da8
--- /dev/null
+++ b/test/modules/http2/test_800_websockets.py
@@ -0,0 +1,363 @@
+import inspect
+import logging
+import os
+import shutil
+import subprocess
+import time
+from datetime import timedelta, datetime
+from typing import Tuple, List
+import packaging.version
+
+import pytest
+import websockets
+from pyhttpd.result import ExecResult
+from pyhttpd.ws_util import WsFrameReader, WsFrame
+
+from .env import H2Conf, H2TestEnv
+
+
+log = logging.getLogger(__name__)
+
+ws_version = packaging.version.parse(websockets.version.version)
+ws_version_min = packaging.version.Version('10.4')
+
+
+def ws_run(env: H2TestEnv, path, authority=None, do_input=None, inbytes=None,
+ send_close=True, timeout=5, scenario='ws-stdin',
+ wait_close: float = 0.0) -> Tuple[ExecResult, List[str], List[WsFrame]]:
+ """ Run the h2ws test client in various scenarios with given input and
+ timings.
+ :param env: the test environment
+ :param path: the path on the Apache server to CONNECt to
+ :param authority: the host:port to use as
+ :param do_input: a Callable for sending input to h2ws
+ :param inbytes: fixed bytes to send to h2ws, unless do_input is given
+ :param send_close: send a CLOSE WebSockets frame at the end
+ :param timeout: timeout for waiting on h2ws to finish
+ :param scenario: name of scenario h2ws should run in
+ :param wait_close: time to wait before closing input
+ :return: ExecResult with exit_code/stdout/stderr of run
+ """
+ h2ws = os.path.join(env.clients_dir, 'h2ws')
+ if not os.path.exists(h2ws):
+ pytest.fail(f'test client not build: {h2ws}')
+ if authority is None:
+ authority = f'cgi.{env.http_tld}:{env.http_port}'
+ args = [
+ h2ws, '-vv', '-c', f'localhost:{env.http_port}',
+ f'ws://{authority}{path}',
+ scenario
+ ]
+ # we write all output to files, because we manipulate input timings
+ # and would run in deadlock situations with h2ws blocking operations
+ # because its output is not consumed
+ start = datetime.now()
+ with open(f'{env.gen_dir}/h2ws.stdout', 'w') as fdout:
+ with open(f'{env.gen_dir}/h2ws.stderr', 'w') as fderr:
+ proc = subprocess.Popen(args=args, stdin=subprocess.PIPE,
+ stdout=fdout, stderr=fderr)
+ if do_input is not None:
+ do_input(proc)
+ elif inbytes is not None:
+ proc.stdin.write(inbytes)
+ proc.stdin.flush()
+
+ if wait_close > 0:
+ time.sleep(wait_close)
+ try:
+ inbytes = WsFrame.client_close(code=1000).to_network() if send_close else None
+ proc.communicate(input=inbytes, timeout=timeout)
+ except subprocess.TimeoutExpired:
+ log.error(f'ws_run: timeout expired')
+ proc.kill()
+ proc.communicate(timeout=timeout)
+ end = datetime.now()
+ lines = open(f'{env.gen_dir}/h2ws.stdout').read().splitlines()
+ infos = [line for line in lines if line.startswith('[1] ')]
+ hex_content = ' '.join([line for line in lines if not line.startswith('[1] ')])
+ if len(infos) > 0 and infos[0] == '[1] :status: 200':
+ frames = WsFrameReader.parse(bytearray.fromhex(hex_content))
+ else:
+ frames = bytearray.fromhex(hex_content)
+ return ExecResult(args=args, exit_code=proc.returncode,
+ stdout=b'', stderr=b'', duration=end - start), infos, frames
+
+
+@pytest.mark.skipif(condition=H2TestEnv.is_unsupported, reason="mod_http2 not supported here")
+@pytest.mark.skipif(condition=not H2TestEnv().httpd_is_at_least("2.4.58"),
+ reason=f'need at least httpd 2.4.58 for this')
+@pytest.mark.skipif(condition=ws_version < ws_version_min,
+ reason=f'websockets is {ws_version}, need at least {ws_version_min}')
+class TestWebSockets:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env):
+ # Apache config that CONNECT proxies a WebSocket server for paths starting
+ # with '/ws/'
+ # The WebSocket server is started in pytest fixture 'ws_server' below.
+ conf = H2Conf(env, extras={
+ 'base': [
+ 'Timeout 1',
+ ],
+ f'cgi.{env.http_tld}': [
+ f' H2WebSockets on',
+ f' ProxyPass /ws/ http://127.0.0.1:{env.ws_port}/ \\',
+ f' upgrade=websocket timeout=10',
+ f' ReadBufferSize 65535'
+ ]
+ })
+ conf.add_vhost_cgi(proxy_self=True, h2proxy_self=True).install()
+ conf.add_vhost_test1(proxy_self=True, h2proxy_self=True).install()
+ assert env.apache_restart() == 0
+
+ def ws_check_alive(self, env, timeout=5):
+ url = f'http://localhost:{env.ws_port}/'
+ end = datetime.now() + timedelta(seconds=timeout)
+ while datetime.now() < end:
+ r = env.curl_get(url, 5)
+ if r.exit_code == 0:
+ return True
+ time.sleep(.1)
+ return False
+
+ def _mkpath(self, path):
+ if not os.path.exists(path):
+ return os.makedirs(path)
+
+ def _rmrf(self, path):
+ if os.path.exists(path):
+ return shutil.rmtree(path)
+
+ @pytest.fixture(autouse=True, scope='class')
+ def ws_server(self, env):
+ # Run our python websockets server that has some special behaviour
+ # for the different path to CONNECT to.
+ run_dir = os.path.join(env.gen_dir, 'ws-server')
+ err_file = os.path.join(run_dir, 'stderr')
+ self._rmrf(run_dir)
+ self._mkpath(run_dir)
+ with open(err_file, 'w') as cerr:
+ cmd = os.path.join(os.path.dirname(inspect.getfile(TestWebSockets)),
+ 'ws_server.py')
+ args = ['python3', cmd, '--port', str(env.ws_port)]
+ p = subprocess.Popen(args=args, cwd=run_dir, stderr=cerr,
+ stdout=cerr)
+ if not self.ws_check_alive(env):
+ p.kill()
+ p.wait()
+ pytest.fail(f'ws_server did not start. stderr={open(err_file).readlines()}')
+ yield
+ p.terminate()
+
+ # CONNECT with invalid :protocol header, must fail
+ def test_h2_800_01_fail_proto(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='fail-proto')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 501', '[1] EOF'], f'{r}'
+ env.httpd_error_log.ignore_recent()
+
+ # a correct CONNECT, send CLOSE, expect CLOSE, basic success
+ def test_h2_800_02_ws_empty(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 1, f'{frames}'
+ assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+
+ # CONNECT to a URL path that does not exist on the server
+ def test_h2_800_03_not_found(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/does-not-exist')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 404', '[1] EOF'], f'{r}'
+
+ # CONNECT to a URL path that is a normal HTTP file resource
+ # we do not want to receive the body of that
+ def test_h2_800_04_non_ws_resource(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/alive.json')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+ assert frames == b''
+
+ # CONNECT to a URL path that sends a delayed HTTP response body
+ # we do not want to receive the body of that
+ def test_h2_800_05_non_ws_delay_resource(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/h2test/error?body_delay=100ms')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+ assert frames == b''
+
+ # CONNECT missing the sec-webSocket-version header
+ def test_h2_800_06_miss_version(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-version')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+ # CONNECT missing the :path header
+ def test_h2_800_07_miss_path(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-path')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] RST'], f'{r}'
+
+ # CONNECT missing the :scheme header
+ def test_h2_800_08_miss_scheme(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-scheme')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] RST'], f'{r}'
+
+ # CONNECT missing the :authority header
+ def test_h2_800_09a_miss_authority(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-authority')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] RST'], f'{r}'
+
+ # CONNECT to authority with disabled websockets
+ def test_h2_800_09b_unsupported(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/',
+ authority=f'test1.{env.http_tld}:{env.http_port}')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 501', '[1] EOF'], f'{r}'
+
+ # CONNECT and exchange a PING
+ def test_h2_800_10_ws_ping(self, env: H2TestEnv, ws_server):
+ ping = WsFrame.client_ping(b'12345')
+ r, infos, frames = ws_run(env, path='/ws/echo/', inbytes=ping.to_network())
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 2, f'{frames}'
+ assert frames[0].opcode == WsFrame.PONG, f'{frames}'
+ assert frames[0].data == ping.data, f'{frames}'
+ assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+ # CONNECT and send several PINGs with a delay of 200ms
+ def test_h2_800_11_ws_timed_pings(self, env: H2TestEnv, ws_server):
+ frame_count = 5
+ ping = WsFrame.client_ping(b'12345')
+
+ def do_send(proc):
+ for _ in range(frame_count):
+ try:
+ proc.stdin.write(ping.to_network())
+ proc.stdin.flush()
+ proc.wait(timeout=0.2)
+ except subprocess.TimeoutExpired:
+ pass
+
+ r, infos, frames = ws_run(env, path='/ws/echo/', do_input=do_send)
+ assert r.exit_code == 0
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == frame_count + 1, f'{frames}'
+ assert frames[-1].opcode == WsFrame.CLOSE, f'{frames}'
+ for i in range(frame_count):
+ assert frames[i].opcode == WsFrame.PONG, f'{frames}'
+ assert frames[i].data == ping.data, f'{frames}'
+
+ # CONNECT to path that closes immediately
+ def test_h2_800_12_ws_unknown(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/unknown')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 1, f'{frames}'
+ # expect a CLOSE with code=4999, reason='path unknown'
+ assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+ assert frames[0].data[2:].decode() == 'path unknown', f'{frames}'
+
+ # CONNECT to a path that sends us 1 TEXT frame
+ def test_h2_800_13_ws_text(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/text/')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 2, f'{frames}'
+ assert frames[0].opcode == WsFrame.TEXT, f'{frames}'
+ assert frames[0].data.decode() == 'hello!', f'{frames}'
+ assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+ # CONNECT to a path that sends us a named file in BINARY frames
+ @pytest.mark.parametrize("fname,flen", [
+ ("data-1k", 1000),
+ ("data-10k", 10000),
+ ("data-100k", 100*1000),
+ ("data-1m", 1000*1000),
+ ])
+ def test_h2_800_14_ws_file(self, env: H2TestEnv, ws_server, fname, flen):
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}', wait_close=0.5)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}'
+
+ # CONNECT to path with 1MB file and trigger varying BINARY frame lengths
+ @pytest.mark.parametrize("frame_len", [
+ 1000 * 1024,
+ 100 * 1024,
+ 10 * 1024,
+ 1 * 1024,
+ 512,
+ ])
+ def test_h2_800_15_ws_frame_len(self, env: H2TestEnv, ws_server, frame_len):
+ fname = "data-1m"
+ flen = 1000*1000
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}', wait_close=0.5)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}'
+
+ # CONNECT to path with 1MB file and trigger delays between BINARY frame writes
+ @pytest.mark.parametrize("frame_delay", [
+ 1,
+ 10,
+ 50,
+ 100,
+ ])
+ def test_h2_800_16_ws_frame_delay(self, env: H2TestEnv, ws_server, frame_delay):
+ fname = "data-1m"
+ flen = 1000*1000
+ # adjust frame_len to allow for 1 second overall duration
+ frame_len = int(flen / (1000 / frame_delay))
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/{frame_delay}',
+ wait_close=1.5)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}\n{r}'
+
+ # CONNECT to path with 1MB file and trigger delays between BINARY frame writes
+ @pytest.mark.parametrize("frame_len", [
+ 64 * 1024,
+ 16 * 1024,
+ 1 * 1024,
+ ])
+ def test_h2_800_17_ws_throughput(self, env: H2TestEnv, ws_server, frame_len):
+ fname = "data-1m"
+ flen = 1000*1000
+ ncount = 5
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/0/{ncount}',
+ wait_close=0.1, send_close=False, timeout=30)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == ncount * flen, f'{frames}\n{r}'
+ # to see these logged, invoke: `pytest -o log_cli=true`
+ log.info(f'throughput (frame-len={frame_len}): "'
+ f'"{(total_len / (1024*1024)) / r.duration.total_seconds():0.2f} MB/s')
+
+ # Check that the tunnel timeout is observed, e.g. the longer holds and
+ # the 1sec cleint conn timeout does not trigger
+ def test_h2_800_18_timeout(self, env: H2TestEnv, ws_server):
+ fname = "data-10k"
+ frame_delay = 1500
+ flen = 10*1000
+ frame_len = 8192
+ # adjust frame_len to allow for 1 second overall duration
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/{frame_delay}',
+ wait_close=2)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}\n{r}'
+