81 lines
2.1 KiB
Python
81 lines
2.1 KiB
Python
# mypy: allow-untyped-defs
|
|
|
|
try:
|
|
from importlib import reload
|
|
except ImportError:
|
|
pass
|
|
import json
|
|
import os
|
|
import queue
|
|
import tempfile
|
|
import threading
|
|
|
|
import pytest
|
|
|
|
from . import serve
|
|
from wptserve import logger
|
|
|
|
|
|
class ServerProcSpy(serve.ServerProc):
|
|
instances = None
|
|
|
|
def start(self, *args, **kwargs):
|
|
result = super().start(*args, **kwargs)
|
|
|
|
if ServerProcSpy.instances is not None:
|
|
ServerProcSpy.instances.put(self)
|
|
|
|
return result
|
|
|
|
|
|
serve.ServerProc = ServerProcSpy # type: ignore
|
|
|
|
|
|
@pytest.fixture()
|
|
def server_subprocesses():
|
|
ServerProcSpy.instances = queue.Queue()
|
|
yield ServerProcSpy.instances
|
|
ServerProcSpy.instances = None
|
|
|
|
|
|
@pytest.fixture()
|
|
def tempfile_name():
|
|
fd, name = tempfile.mkstemp()
|
|
yield name
|
|
os.close(fd)
|
|
os.remove(name)
|
|
|
|
|
|
def test_subprocess_exit(server_subprocesses, tempfile_name):
|
|
timeout = 30
|
|
|
|
def target():
|
|
# By default, the server initially creates a child process to validate
|
|
# local system configuration. That process is unrelated to the behavior
|
|
# under test, but at the time of this writing, the parent uses the same
|
|
# constructor that is also used to create the long-running processes
|
|
# which are relevant to this functionality. Disable the check so that
|
|
# the constructor is only used to create relevant processes.
|
|
with open(tempfile_name, 'w') as handle:
|
|
json.dump({"check_subdomains": False, "bind_address": False}, handle)
|
|
|
|
# The `logger` module from the wptserver package uses a singleton
|
|
# pattern which resists testing. In order to avoid conflicting with
|
|
# other tests which rely on that module, pre-existing state is
|
|
# discarded through an explicit "reload" operation.
|
|
reload(logger)
|
|
|
|
serve.run(config_path=tempfile_name)
|
|
|
|
thread = threading.Thread(target=target)
|
|
|
|
thread.start()
|
|
|
|
server_subprocesses.get(True, timeout)
|
|
subprocess = server_subprocesses.get(True, timeout)
|
|
subprocess.request_shutdown()
|
|
subprocess.wait()
|
|
|
|
thread.join(timeout)
|
|
|
|
assert not thread.is_alive()
|