#!/usr/bin/env python # # Copyright 2012, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """End-to-end tests for pywebsocket3. Tests standalone.py. """ from __future__ import absolute_import import locale import logging import os import socket import subprocess import sys import time import unittest from six.moves import urllib import set_sys_path # Update sys.path to locate pywebsocket3 module. from test import client_for_testing # Special message that tells the echo server to start closing handshake _GOODBYE_MESSAGE = 'Goodbye' _SERVER_WARMUP_IN_SEC = 0.2 # Test body functions def _echo_check_procedure(client): client.connect() client.send_message('test') client.assert_receive('test') client.send_message('helloworld') client.assert_receive('helloworld') client.send_close() client.assert_receive_close() client.assert_connection_closed() def _echo_check_procedure_with_binary(client): client.connect() client.send_message(b'binary', binary=True) client.assert_receive(b'binary', binary=True) client.send_message(b'\x00\x80\xfe\xff\x00\x80', binary=True) client.assert_receive(b'\x00\x80\xfe\xff\x00\x80', binary=True) client.send_close() client.assert_receive_close() client.assert_connection_closed() def _echo_check_procedure_with_goodbye(client): client.connect() client.send_message('test') client.assert_receive('test') client.send_message(_GOODBYE_MESSAGE) client.assert_receive(_GOODBYE_MESSAGE) client.assert_receive_close() client.send_close() client.assert_connection_closed() def _echo_check_procedure_with_code_and_reason(client, code, reason): client.connect() client.send_close(code, reason) client.assert_receive_close(code, reason) client.assert_connection_closed() def _unmasked_frame_check_procedure(client): client.connect() client.send_message('test', mask=False) client.assert_receive_close(client_for_testing.STATUS_PROTOCOL_ERROR, '') client.assert_connection_closed() def _check_handshake_with_basic_auth(client): client.connect() client.send_message(_GOODBYE_MESSAGE) client.assert_receive(_GOODBYE_MESSAGE) client.assert_receive_close() client.send_close() client.assert_connection_closed() class EndToEndTestBase(unittest.TestCase): """Base class for end-to-end tests that launch pywebsocket standalone server as a separate process, connect to it using the client_for_testing module, and check if the server behaves correctly by exchanging opening handshake and frames over a TCP connection. """ def setUp(self): self.server_stderr = None self.top_dir = os.path.join(os.path.dirname(__file__), '..') os.putenv('PYTHONPATH', os.path.pathsep.join(sys.path)) self.standalone_command = os.path.join(self.top_dir, 'pywebsocket3', 'standalone.py') self.document_root = os.path.join(self.top_dir, 'example') s = socket.socket() s.bind(('localhost', 0)) (_, self.test_port) = s.getsockname() s.close() self._options = client_for_testing.ClientOptions() self._options.server_host = 'localhost' self._options.origin = 'http://localhost' self._options.resource = '/echo' self._options.server_port = self.test_port # TODO(tyoshino): Use tearDown to kill the server. def _run_python_command(self, commandline, stdout=None, stderr=None): close_fds = True if sys.platform != 'win32' else None return subprocess.Popen([sys.executable] + commandline, close_fds=close_fds, stdout=stdout, stderr=stderr) def _run_server(self, extra_args=[]): args = [ self.standalone_command, '-H', 'localhost', '-V', 'localhost', '-p', str(self.test_port), '-P', str(self.test_port), '-d', self.document_root ] # Inherit the level set to the root logger by test runner. root_logger = logging.getLogger() log_level = root_logger.getEffectiveLevel() if log_level != logging.NOTSET: args.append('--log-level') args.append(logging.getLevelName(log_level).lower()) args += extra_args return self._run_python_command(args, stderr=self.server_stderr) def _close_server(self, server): """ This method mimics Popen.__exit__ to gracefully kill the server process. Its main purpose is to maintain comptaibility between python 2 and 3, since Popen in python 2 does not have __exit__ attribute. """ server.kill() if server.stdout: server.stdout.close() if server.stderr: server.stderr.close() if server.stdin: server.stdin.close() server.wait() class EndToEndHyBiTest(EndToEndTestBase): def setUp(self): EndToEndTestBase.setUp(self) def _run_test_with_options(self, test_function, options, server_options=[]): server = self._run_server(server_options) try: # TODO(tyoshino): add some logic to poll the server until it # becomes ready time.sleep(_SERVER_WARMUP_IN_SEC) client = client_for_testing.create_client(options) try: test_function(client) finally: client.close_socket() finally: self._close_server(server) def _run_test(self, test_function): self._run_test_with_options(test_function, self._options) def _run_permessage_deflate_test(self, offer, response_checker, test_function): server = self._run_server() try: time.sleep(_SERVER_WARMUP_IN_SEC) self._options.extensions += offer self._options.check_permessage_deflate = response_checker client = client_for_testing.create_client(self._options) try: client.connect() if test_function is not None: test_function(client) client.assert_connection_closed() finally: client.close_socket() finally: self._close_server(server) def _run_close_with_code_and_reason_test(self, test_function, code, reason, server_options=[]): server = self._run_server() try: time.sleep(_SERVER_WARMUP_IN_SEC) client = client_for_testing.create_client(self._options) try: test_function(client, code, reason) finally: client.close_socket() finally: self._close_server(server) def _run_http_fallback_test(self, options, status): server = self._run_server() try: time.sleep(_SERVER_WARMUP_IN_SEC) client = client_for_testing.create_client(options) try: client.connect() self.fail('Could not catch HttpStatusException') except client_for_testing.HttpStatusException as e: self.assertEqual(status, e.status) except Exception as e: self.fail('Catch unexpected exception') finally: client.close_socket() finally: self._close_server(server) def test_echo(self): self._run_test(_echo_check_procedure) def test_echo_binary(self): self._run_test(_echo_check_procedure_with_binary) def test_echo_server_close(self): self._run_test(_echo_check_procedure_with_goodbye) def test_unmasked_frame(self): self._run_test(_unmasked_frame_check_procedure) def test_echo_permessage_deflate(self): def test_function(client): # From the examples in the spec. compressed_hello = b'\xf2\x48\xcd\xc9\xc9\x07\x00' client._stream.send_data(compressed_hello, client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( compressed_hello, opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client.send_close() client.assert_receive_close() def response_checker(parameter): self.assertEqual('permessage-deflate', parameter.name()) self.assertEqual([], parameter.get_parameters()) self._run_permessage_deflate_test(['permessage-deflate'], response_checker, test_function) def test_echo_permessage_deflate_two_frames(self): def test_function(client): # From the examples in the spec. client._stream.send_data(b'\xf2\x48\xcd', client_for_testing.OPCODE_TEXT, end=False, rsv1=1) client._stream.send_data(b'\xc9\xc9\x07\x00', client_for_testing.OPCODE_TEXT) client._stream.assert_receive_binary( b'\xf2\x48\xcd\xc9\xc9\x07\x00', opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client.send_close() client.assert_receive_close() def response_checker(parameter): self.assertEqual('permessage-deflate', parameter.name()) self.assertEqual([], parameter.get_parameters()) self._run_permessage_deflate_test(['permessage-deflate'], response_checker, test_function) def test_echo_permessage_deflate_two_messages(self): def test_function(client): # From the examples in the spec. client._stream.send_data(b'\xf2\x48\xcd\xc9\xc9\x07\x00', client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.send_data(b'\xf2\x00\x11\x00\x00', client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( b'\xf2\x48\xcd\xc9\xc9\x07\x00', opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( b'\xf2\x00\x11\x00\x00', opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client.send_close() client.assert_receive_close() def response_checker(parameter): self.assertEqual('permessage-deflate', parameter.name()) self.assertEqual([], parameter.get_parameters()) self._run_permessage_deflate_test(['permessage-deflate'], response_checker, test_function) def test_echo_permessage_deflate_two_msgs_server_no_context_takeover(self): def test_function(client): # From the examples in the spec. client._stream.send_data(b'\xf2\x48\xcd\xc9\xc9\x07\x00', client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.send_data(b'\xf2\x00\x11\x00\x00', client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( b'\xf2\x48\xcd\xc9\xc9\x07\x00', opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( b'\xf2\x48\xcd\xc9\xc9\x07\x00', opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client.send_close() client.assert_receive_close() def response_checker(parameter): self.assertEqual('permessage-deflate', parameter.name()) self.assertEqual([('server_no_context_takeover', None)], parameter.get_parameters()) self._run_permessage_deflate_test( ['permessage-deflate; server_no_context_takeover'], response_checker, test_function) def test_echo_permessage_deflate_preference(self): def test_function(client): # From the examples in the spec. compressed_hello = b'\xf2\x48\xcd\xc9\xc9\x07\x00' client._stream.send_data(compressed_hello, client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( compressed_hello, opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client.send_close() client.assert_receive_close() def response_checker(parameter): self.assertEqual('permessage-deflate', parameter.name()) self.assertEqual([], parameter.get_parameters()) self._run_permessage_deflate_test( ['permessage-deflate', 'deflate-frame'], response_checker, test_function) def test_echo_permessage_deflate_with_parameters(self): def test_function(client): # From the examples in the spec. compressed_hello = b'\xf2\x48\xcd\xc9\xc9\x07\x00' client._stream.send_data(compressed_hello, client_for_testing.OPCODE_TEXT, rsv1=1) client._stream.assert_receive_binary( compressed_hello, opcode=client_for_testing.OPCODE_TEXT, rsv1=1) client.send_close() client.assert_receive_close() def response_checker(parameter): self.assertEqual('permessage-deflate', parameter.name()) self.assertEqual([('server_max_window_bits', '10'), ('server_no_context_takeover', None)], parameter.get_parameters()) self._run_permessage_deflate_test([ 'permessage-deflate; server_max_window_bits=10; ' 'server_no_context_takeover' ], response_checker, test_function) def test_echo_permessage_deflate_with_bad_server_max_window_bits(self): def test_function(client): client.send_close() client.assert_receive_close() def response_checker(parameter): raise Exception('Unexpected acceptance of permessage-deflate') self._run_permessage_deflate_test( ['permessage-deflate; server_max_window_bits=3000000'], response_checker, test_function) def test_echo_permessage_deflate_with_bad_server_max_window_bits(self): def test_function(client): client.send_close() client.assert_receive_close() def response_checker(parameter): raise Exception('Unexpected acceptance of permessage-deflate') self._run_permessage_deflate_test( ['permessage-deflate; server_max_window_bits=3000000'], response_checker, test_function) def test_echo_permessage_deflate_with_undefined_parameter(self): def test_function(client): client.send_close() client.assert_receive_close() def response_checker(parameter): raise Exception('Unexpected acceptance of permessage-deflate') self._run_permessage_deflate_test(['permessage-deflate; foo=bar'], response_checker, test_function) def test_echo_close_with_code_and_reason(self): self._options.resource = '/close' self._run_close_with_code_and_reason_test( _echo_check_procedure_with_code_and_reason, 3333, 'sunsunsunsun') def test_echo_close_with_empty_body(self): self._options.resource = '/close' self._run_close_with_code_and_reason_test( _echo_check_procedure_with_code_and_reason, None, '') def test_close_on_protocol_error(self): """Tests that the server sends a close frame with protocol error status code when the client sends data with some protocol error. """ def test_function(client): client.connect() # Intermediate frame without any preceding start of fragmentation # frame. client.send_frame_of_arbitrary_bytes(b'\x80\x80', '') client.assert_receive_close( client_for_testing.STATUS_PROTOCOL_ERROR) self._run_test(test_function) def test_close_on_unsupported_frame(self): """Tests that the server sends a close frame with unsupported operation status code when the client sends data asking some operation that is not supported by the server. """ def test_function(client): client.connect() # Text frame with RSV3 bit raised. client.send_frame_of_arbitrary_bytes(b'\x91\x80', '') client.assert_receive_close( client_for_testing.STATUS_UNSUPPORTED_DATA) self._run_test(test_function) def test_close_on_invalid_frame(self): """Tests that the server sends a close frame with invalid frame payload data status code when the client sends an invalid frame like containing invalid UTF-8 character. """ def test_function(client): client.connect() # Text frame with invalid UTF-8 string. client.send_message(b'\x80', raw=True) client.assert_receive_close( client_for_testing.STATUS_INVALID_FRAME_PAYLOAD_DATA) self._run_test(test_function) def test_close_on_internal_endpoint_error(self): """Tests that the server sends a close frame with internal endpoint error status code when the handler does bad operation. """ self._options.resource = '/internal_error' def test_function(client): client.connect() client.assert_receive_close( client_for_testing.STATUS_INTERNAL_ENDPOINT_ERROR) self._run_test(test_function) def test_absolute_uri(self): """Tests absolute uri request.""" options = self._options options.resource = 'ws://localhost:%d/echo' % options.server_port self._run_test_with_options(_echo_check_procedure, options) def test_invalid_absolute_uri(self): """Tests invalid absolute uri request.""" options = self._options options.resource = 'ws://invalidlocalhost:%d/echo' % options.server_port options.server_stderr = subprocess.PIPE self._run_http_fallback_test(options, 404) def test_origin_check(self): """Tests http fallback on origin check fail.""" options = self._options options.resource = '/origin_check' # Server shows warning message for http 403 fallback. This warning # message is confusing. Following pipe disposes warning messages. self.server_stderr = subprocess.PIPE self._run_http_fallback_test(options, 403) def test_invalid_resource(self): """Tests invalid resource path.""" options = self._options options.resource = '/no_resource' self.server_stderr = subprocess.PIPE self._run_http_fallback_test(options, 404) def test_fragmentized_resource(self): """Tests resource name with fragment""" options = self._options options.resource = '/echo#fragment' self.server_stderr = subprocess.PIPE self._run_http_fallback_test(options, 400) def test_version_check(self): """Tests http fallback on version check fail.""" options = self._options options.version = 99 self._run_http_fallback_test(options, 400) def test_basic_auth_connection(self): """Test successful basic auth""" options = self._options options.use_basic_auth = True self.server_stderr = subprocess.PIPE self._run_test_with_options(_check_handshake_with_basic_auth, options, server_options=['--basic-auth']) def test_invalid_basic_auth_connection(self): """Tests basic auth with invalid credentials""" options = self._options options.use_basic_auth = True options.basic_auth_credential = 'invalid:test' self.server_stderr = subprocess.PIPE with self.assertRaises(client_for_testing.HttpStatusException) as e: self._run_test_with_options(_check_handshake_with_basic_auth, options, server_options=['--basic-auth']) self.assertEqual(101, e.exception.status) class EndToEndTestWithEchoClient(EndToEndTestBase): def setUp(self): EndToEndTestBase.setUp(self) def _check_example_echo_client_result(self, expected, stdoutdata, stderrdata): actual = stdoutdata.decode(locale.getpreferredencoding()) # In Python 3 on Windows we get "\r\n" terminators back from # the subprocess and we need to replace them with "\n" to get # a match. This is a bit of a hack, but avoids platform- and # version- specific code. actual = actual.replace('\r\n', '\n') if actual != expected: raise Exception('Unexpected result on example echo client: ' '%r (expected) vs %r (actual)' % (expected, actual)) if stderrdata is not None: raise Exception('Unexpected error message on example echo ' 'client: %r' % stderrdata) def test_example_echo_client(self): """Tests that the echo_client.py example can talk with the server.""" server = self._run_server() try: time.sleep(_SERVER_WARMUP_IN_SEC) client_command = os.path.join(self.top_dir, 'example', 'echo_client.py') # Expected output for the default messages. default_expectation = (u'Send: Hello\n' u'Recv: Hello\n' u'Send: <>\n' u'Recv: <>\n' u'Send close\n' u'Recv ack\n') args = [client_command, '-p', str(self._options.server_port)] client = self._run_python_command(args, stdout=subprocess.PIPE) stdoutdata, stderrdata = client.communicate() self._check_example_echo_client_result(default_expectation, stdoutdata, stderrdata) # Process a big message for which extended payload length is used. # To handle extended payload length, ws_version attribute will be # accessed. This test checks that ws_version is correctly set. big_message = 'a' * 1024 args = [ client_command, '-p', str(self._options.server_port), '-m', big_message ] client = self._run_python_command(args, stdout=subprocess.PIPE) stdoutdata, stderrdata = client.communicate() expected = ('Send: %s\nRecv: %s\nSend close\nRecv ack\n' % (big_message, big_message)) self._check_example_echo_client_result(expected, stdoutdata, stderrdata) # Test the permessage-deflate extension. args = [ client_command, '-p', str(self._options.server_port), '--use_permessage_deflate' ] client = self._run_python_command(args, stdout=subprocess.PIPE) stdoutdata, stderrdata = client.communicate() self._check_example_echo_client_result(default_expectation, stdoutdata, stderrdata) finally: self._close_server(server) class EndToEndTestWithCgi(EndToEndTestBase): def setUp(self): EndToEndTestBase.setUp(self) def test_cgi(self): """Verifies that CGI scripts work.""" server = self._run_server(extra_args=['--cgi-paths', '/cgi-bin']) time.sleep(_SERVER_WARMUP_IN_SEC) url = 'http://localhost:%d/cgi-bin/hi.py' % self._options.server_port # urlopen() in Python 2.7 doesn't support "with". try: f = urllib.request.urlopen(url) except: self._close_server(server) raise try: self.assertEqual(f.getcode(), 200) self.assertEqual(f.info().get('Content-Type'), 'text/plain') body = f.read() self.assertEqual(body.rstrip(b'\r\n'), b'Hi from hi.py') finally: f.close() self._close_server(server) if __name__ == '__main__': unittest.main() # vi:sts=4 sw=4 et