summaryrefslogtreecommitdiffstats
path: root/testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py')
-rw-r--r--testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py954
1 files changed, 954 insertions, 0 deletions
diff --git a/testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py b/testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py
new file mode 100644
index 0000000000..99095224cc
--- /dev/null
+++ b/testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py
@@ -0,0 +1,954 @@
+# Copyright 2011, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""This file provides classes and helper functions for parsing/building frames
+of the WebSocket protocol (RFC 6455).
+
+Specification:
+http://tools.ietf.org/html/rfc6455
+"""
+
+from __future__ import absolute_import, division
+
+from collections import deque
+import logging
+import os
+import struct
+import time
+import socket
+import six
+
+from mod_pywebsocket import common
+from mod_pywebsocket import util
+from mod_pywebsocket._stream_exceptions import BadOperationException
+from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException
+from mod_pywebsocket._stream_exceptions import InvalidFrameException
+from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception
+from mod_pywebsocket._stream_exceptions import UnsupportedFrameException
+
+_NOOP_MASKER = util.NoopMasker()
+
+
+class Frame(object):
+ def __init__(self,
+ fin=1,
+ rsv1=0,
+ rsv2=0,
+ rsv3=0,
+ opcode=None,
+ payload=b''):
+ self.fin = fin
+ self.rsv1 = rsv1
+ self.rsv2 = rsv2
+ self.rsv3 = rsv3
+ self.opcode = opcode
+ self.payload = payload
+
+
+# Helper functions made public to be used for writing unittests for WebSocket
+# clients.
+
+
+def create_length_header(length, mask):
+ """Creates a length header.
+
+ Args:
+ length: Frame length. Must be less than 2^63.
+ mask: Mask bit. Must be boolean.
+
+ Raises:
+ ValueError: when bad data is given.
+ """
+
+ if mask:
+ mask_bit = 1 << 7
+ else:
+ mask_bit = 0
+
+ if length < 0:
+ raise ValueError('length must be non negative integer')
+ elif length <= 125:
+ return util.pack_byte(mask_bit | length)
+ elif length < (1 << 16):
+ return util.pack_byte(mask_bit | 126) + struct.pack('!H', length)
+ elif length < (1 << 63):
+ return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length)
+ else:
+ raise ValueError('Payload is too big for one frame')
+
+
+def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
+ """Creates a frame header.
+
+ Raises:
+ Exception: when bad data is given.
+ """
+
+ if opcode < 0 or 0xf < opcode:
+ raise ValueError('Opcode out of range')
+
+ if payload_length < 0 or (1 << 63) <= payload_length:
+ raise ValueError('payload_length out of range')
+
+ if (fin | rsv1 | rsv2 | rsv3) & ~1:
+ raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
+
+ header = b''
+
+ first_byte = ((fin << 7)
+ | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
+ | opcode)
+ header += util.pack_byte(first_byte)
+ header += create_length_header(payload_length, mask)
+
+ return header
+
+
+def _build_frame(header, body, mask):
+ if not mask:
+ return header + body
+
+ masking_nonce = os.urandom(4)
+ masker = util.RepeatedXorMasker(masking_nonce)
+
+ return header + masking_nonce + masker.mask(body)
+
+
+def _filter_and_format_frame_object(frame, mask, frame_filters):
+ for frame_filter in frame_filters:
+ frame_filter.filter(frame)
+
+ header = create_header(frame.opcode, len(frame.payload), frame.fin,
+ frame.rsv1, frame.rsv2, frame.rsv3, mask)
+ return _build_frame(header, frame.payload, mask)
+
+
+def create_binary_frame(message,
+ opcode=common.OPCODE_BINARY,
+ fin=1,
+ mask=False,
+ frame_filters=[]):
+ """Creates a simple binary frame with no extension, reserved bit."""
+
+ frame = Frame(fin=fin, opcode=opcode, payload=message)
+ return _filter_and_format_frame_object(frame, mask, frame_filters)
+
+
+def create_text_frame(message,
+ opcode=common.OPCODE_TEXT,
+ fin=1,
+ mask=False,
+ frame_filters=[]):
+ """Creates a simple text frame with no extension, reserved bit."""
+
+ encoded_message = message.encode('utf-8')
+ return create_binary_frame(encoded_message, opcode, fin, mask,
+ frame_filters)
+
+
+def parse_frame(receive_bytes,
+ logger=None,
+ ws_version=common.VERSION_HYBI_LATEST,
+ unmask_receive=True):
+ """Parses a frame. Returns a tuple containing each header field and
+ payload.
+
+ Args:
+ receive_bytes: a function that reads frame data from a stream or
+ something similar. The function takes length of the bytes to be
+ read. The function must raise ConnectionTerminatedException if
+ there is not enough data to be read.
+ logger: a logging object.
+ ws_version: the version of WebSocket protocol.
+ unmask_receive: unmask received frames. When received unmasked
+ frame, raises InvalidFrameException.
+
+ Raises:
+ ConnectionTerminatedException: when receive_bytes raises it.
+ InvalidFrameException: when the frame contains invalid data.
+ """
+
+ if not logger:
+ logger = logging.getLogger()
+
+ logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
+
+ first_byte = ord(receive_bytes(1))
+ fin = (first_byte >> 7) & 1
+ rsv1 = (first_byte >> 6) & 1
+ rsv2 = (first_byte >> 5) & 1
+ rsv3 = (first_byte >> 4) & 1
+ opcode = first_byte & 0xf
+
+ second_byte = ord(receive_bytes(1))
+ mask = (second_byte >> 7) & 1
+ payload_length = second_byte & 0x7f
+
+ logger.log(
+ common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
+ 'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask,
+ payload_length)
+
+ if (mask == 1) != unmask_receive:
+ raise InvalidFrameException(
+ 'Mask bit on the received frame did\'nt match masking '
+ 'configuration for received frames')
+
+ # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
+ # into the 8-octet extended payload length field (or 0x0-0xFD in
+ # 2-octet field).
+ valid_length_encoding = True
+ length_encoding_bytes = 1
+ if payload_length == 127:
+ logger.log(common.LOGLEVEL_FINE,
+ 'Receive 8-octet extended payload length')
+
+ extended_payload_length = receive_bytes(8)
+ payload_length = struct.unpack('!Q', extended_payload_length)[0]
+ if payload_length > 0x7FFFFFFFFFFFFFFF:
+ raise InvalidFrameException('Extended payload length >= 2^63')
+ if ws_version >= 13 and payload_length < 0x10000:
+ valid_length_encoding = False
+ length_encoding_bytes = 8
+
+ logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
+ payload_length)
+ elif payload_length == 126:
+ logger.log(common.LOGLEVEL_FINE,
+ 'Receive 2-octet extended payload length')
+
+ extended_payload_length = receive_bytes(2)
+ payload_length = struct.unpack('!H', extended_payload_length)[0]
+ if ws_version >= 13 and payload_length < 126:
+ valid_length_encoding = False
+ length_encoding_bytes = 2
+
+ logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
+ payload_length)
+
+ if not valid_length_encoding:
+ logger.warning(
+ 'Payload length is not encoded using the minimal number of '
+ 'bytes (%d is encoded using %d bytes)', payload_length,
+ length_encoding_bytes)
+
+ if mask == 1:
+ logger.log(common.LOGLEVEL_FINE, 'Receive mask')
+
+ masking_nonce = receive_bytes(4)
+ masker = util.RepeatedXorMasker(masking_nonce)
+
+ logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
+ else:
+ masker = _NOOP_MASKER
+
+ logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
+ if logger.isEnabledFor(common.LOGLEVEL_FINE):
+ receive_start = time.time()
+
+ raw_payload_bytes = receive_bytes(payload_length)
+
+ if logger.isEnabledFor(common.LOGLEVEL_FINE):
+ # pylint --py3k W1619
+ logger.log(
+ common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s',
+ payload_length / (time.time() - receive_start) / 1000 / 1000)
+ logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
+
+ if logger.isEnabledFor(common.LOGLEVEL_FINE):
+ unmask_start = time.time()
+
+ unmasked_bytes = masker.mask(raw_payload_bytes)
+
+ if logger.isEnabledFor(common.LOGLEVEL_FINE):
+ # pylint --py3k W1619
+ logger.log(common.LOGLEVEL_FINE,
+ 'Done unmasking payload data at %s MB/s',
+ payload_length / (time.time() - unmask_start) / 1000 / 1000)
+
+ return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
+
+
+class FragmentedFrameBuilder(object):
+ """A stateful class to send a message as fragments."""
+ def __init__(self, mask, frame_filters=[], encode_utf8=True):
+ """Constructs an instance."""
+
+ self._mask = mask
+ self._frame_filters = frame_filters
+ # This is for skipping UTF-8 encoding when building text type frames
+ # from compressed data.
+ self._encode_utf8 = encode_utf8
+
+ self._started = False
+
+ # Hold opcode of the first frame in messages to verify types of other
+ # frames in the message are all the same.
+ self._opcode = common.OPCODE_TEXT
+
+ def build(self, payload_data, end, binary):
+ if binary:
+ frame_type = common.OPCODE_BINARY
+ else:
+ frame_type = common.OPCODE_TEXT
+ if self._started:
+ if self._opcode != frame_type:
+ raise ValueError('Message types are different in frames for '
+ 'the same message')
+ opcode = common.OPCODE_CONTINUATION
+ else:
+ opcode = frame_type
+ self._opcode = frame_type
+
+ if end:
+ self._started = False
+ fin = 1
+ else:
+ self._started = True
+ fin = 0
+
+ if binary or not self._encode_utf8:
+ return create_binary_frame(payload_data, opcode, fin, self._mask,
+ self._frame_filters)
+ else:
+ return create_text_frame(payload_data, opcode, fin, self._mask,
+ self._frame_filters)
+
+
+def _create_control_frame(opcode, body, mask, frame_filters):
+ frame = Frame(opcode=opcode, payload=body)
+
+ for frame_filter in frame_filters:
+ frame_filter.filter(frame)
+
+ if len(frame.payload) > 125:
+ raise BadOperationException(
+ 'Payload data size of control frames must be 125 bytes or less')
+
+ header = create_header(frame.opcode, len(frame.payload), frame.fin,
+ frame.rsv1, frame.rsv2, frame.rsv3, mask)
+ return _build_frame(header, frame.payload, mask)
+
+
+def create_ping_frame(body, mask=False, frame_filters=[]):
+ return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
+
+
+def create_pong_frame(body, mask=False, frame_filters=[]):
+ return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
+
+
+def create_close_frame(body, mask=False, frame_filters=[]):
+ return _create_control_frame(common.OPCODE_CLOSE, body, mask,
+ frame_filters)
+
+
+def create_closing_handshake_body(code, reason):
+ body = b''
+ if code is not None:
+ if (code > common.STATUS_USER_PRIVATE_MAX
+ or code < common.STATUS_NORMAL_CLOSURE):
+ raise BadOperationException('Status code is out of range')
+ if (code == common.STATUS_NO_STATUS_RECEIVED
+ or code == common.STATUS_ABNORMAL_CLOSURE
+ or code == common.STATUS_TLS_HANDSHAKE):
+ raise BadOperationException('Status code is reserved pseudo '
+ 'code')
+ encoded_reason = reason.encode('utf-8')
+ body = struct.pack('!H', code) + encoded_reason
+ return body
+
+
+class StreamOptions(object):
+ """Holds option values to configure Stream objects."""
+ def __init__(self):
+ """Constructs StreamOptions."""
+
+ # Filters applied to frames.
+ self.outgoing_frame_filters = []
+ self.incoming_frame_filters = []
+
+ # Filters applied to messages. Control frames are not affected by them.
+ self.outgoing_message_filters = []
+ self.incoming_message_filters = []
+
+ self.encode_text_message_to_utf8 = True
+ self.mask_send = False
+ self.unmask_receive = True
+
+
+class Stream(object):
+ """A class for parsing/building frames of the WebSocket protocol
+ (RFC 6455).
+ """
+ def __init__(self, request, options):
+ """Constructs an instance.
+
+ Args:
+ request: mod_python request.
+ """
+
+ self._logger = util.get_class_logger(self)
+
+ self._options = options
+ self._request = request
+
+ self._request.client_terminated = False
+ self._request.server_terminated = False
+
+ # Holds body of received fragments.
+ self._received_fragments = []
+ # Holds the opcode of the first fragment.
+ self._original_opcode = None
+
+ self._writer = FragmentedFrameBuilder(
+ self._options.mask_send, self._options.outgoing_frame_filters,
+ self._options.encode_text_message_to_utf8)
+
+ self._ping_queue = deque()
+
+ def _read(self, length):
+ """Reads length bytes from connection. In case we catch any exception,
+ prepends remote address to the exception message and raise again.
+
+ Raises:
+ ConnectionTerminatedException: when read returns empty string.
+ """
+
+ try:
+ read_bytes = self._request.connection.read(length)
+ if not read_bytes:
+ raise ConnectionTerminatedException(
+ 'Receiving %d byte failed. Peer (%r) closed connection' %
+ (length, (self._request.connection.remote_addr, )))
+ return read_bytes
+ except IOError as e:
+ # Also catch an IOError because mod_python throws it.
+ raise ConnectionTerminatedException(
+ 'Receiving %d byte failed. IOError (%s) occurred' %
+ (length, e))
+
+ def _write(self, bytes_to_write):
+ """Writes given bytes to connection. In case we catch any exception,
+ prepends remote address to the exception message and raise again.
+ """
+
+ try:
+ self._request.connection.write(bytes_to_write)
+ except Exception as e:
+ util.prepend_message_to_exception(
+ 'Failed to send message to %r: ' %
+ (self._request.connection.remote_addr, ), e)
+ raise
+
+ def receive_bytes(self, length):
+ """Receives multiple bytes. Retries read when we couldn't receive the
+ specified amount. This method returns byte strings.
+
+ Raises:
+ ConnectionTerminatedException: when read returns empty string.
+ """
+
+ read_bytes = []
+ while length > 0:
+ new_read_bytes = self._read(length)
+ read_bytes.append(new_read_bytes)
+ length -= len(new_read_bytes)
+ return b''.join(read_bytes)
+
+ def _read_until(self, delim_char):
+ """Reads bytes until we encounter delim_char. The result will not
+ contain delim_char.
+
+ Raises:
+ ConnectionTerminatedException: when read returns empty string.
+ """
+
+ read_bytes = []
+ while True:
+ ch = self._read(1)
+ if ch == delim_char:
+ break
+ read_bytes.append(ch)
+ return b''.join(read_bytes)
+
+ def _receive_frame(self):
+ """Receives a frame and return data in the frame as a tuple containing
+ each header field and payload separately.
+
+ Raises:
+ ConnectionTerminatedException: when read returns empty
+ string.
+ InvalidFrameException: when the frame contains invalid data.
+ """
+ def _receive_bytes(length):
+ return self.receive_bytes(length)
+
+ return parse_frame(receive_bytes=_receive_bytes,
+ logger=self._logger,
+ ws_version=self._request.ws_version,
+ unmask_receive=self._options.unmask_receive)
+
+ def _receive_frame_as_frame_object(self):
+ opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
+
+ return Frame(fin=fin,
+ rsv1=rsv1,
+ rsv2=rsv2,
+ rsv3=rsv3,
+ opcode=opcode,
+ payload=unmasked_bytes)
+
+ def receive_filtered_frame(self):
+ """Receives a frame and applies frame filters and message filters.
+ The frame to be received must satisfy following conditions:
+ - The frame is not fragmented.
+ - The opcode of the frame is TEXT or BINARY.
+
+ DO NOT USE this method except for testing purpose.
+ """
+
+ frame = self._receive_frame_as_frame_object()
+ if not frame.fin:
+ raise InvalidFrameException(
+ 'Segmented frames must not be received via '
+ 'receive_filtered_frame()')
+ if (frame.opcode != common.OPCODE_TEXT
+ and frame.opcode != common.OPCODE_BINARY):
+ raise InvalidFrameException(
+ 'Control frames must not be received via '
+ 'receive_filtered_frame()')
+
+ for frame_filter in self._options.incoming_frame_filters:
+ frame_filter.filter(frame)
+ for message_filter in self._options.incoming_message_filters:
+ frame.payload = message_filter.filter(frame.payload)
+ return frame
+
+ def send_message(self, message, end=True, binary=False):
+ """Send message.
+
+ Args:
+ message: text in unicode or binary in str to send.
+ binary: send message as binary frame.
+
+ Raises:
+ BadOperationException: when called on a server-terminated
+ connection or called with inconsistent message type or
+ binary parameter.
+ """
+
+ if self._request.server_terminated:
+ raise BadOperationException(
+ 'Requested send_message after sending out a closing handshake')
+
+ if binary and isinstance(message, six.text_type):
+ raise BadOperationException(
+ 'Message for binary frame must not be instance of Unicode')
+
+ for message_filter in self._options.outgoing_message_filters:
+ message = message_filter.filter(message, end, binary)
+
+ try:
+ # Set this to any positive integer to limit maximum size of data in
+ # payload data of each frame.
+ MAX_PAYLOAD_DATA_SIZE = -1
+
+ if MAX_PAYLOAD_DATA_SIZE <= 0:
+ self._write(self._writer.build(message, end, binary))
+ return
+
+ bytes_written = 0
+ while True:
+ end_for_this_frame = end
+ bytes_to_write = len(message) - bytes_written
+ if (MAX_PAYLOAD_DATA_SIZE > 0
+ and bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
+ end_for_this_frame = False
+ bytes_to_write = MAX_PAYLOAD_DATA_SIZE
+
+ frame = self._writer.build(
+ message[bytes_written:bytes_written + bytes_to_write],
+ end_for_this_frame, binary)
+ self._write(frame)
+
+ bytes_written += bytes_to_write
+
+ # This if must be placed here (the end of while block) so that
+ # at least one frame is sent.
+ if len(message) <= bytes_written:
+ break
+ except ValueError as e:
+ raise BadOperationException(e)
+
+ def _get_message_from_frame(self, frame):
+ """Gets a message from frame. If the message is composed of fragmented
+ frames and the frame is not the last fragmented frame, this method
+ returns None. The whole message will be returned when the last
+ fragmented frame is passed to this method.
+
+ Raises:
+ InvalidFrameException: when the frame doesn't match defragmentation
+ context, or the frame contains invalid data.
+ """
+
+ if frame.opcode == common.OPCODE_CONTINUATION:
+ if not self._received_fragments:
+ if frame.fin:
+ raise InvalidFrameException(
+ 'Received a termination frame but fragmentation '
+ 'not started')
+ else:
+ raise InvalidFrameException(
+ 'Received an intermediate frame but '
+ 'fragmentation not started')
+
+ if frame.fin:
+ # End of fragmentation frame
+ self._received_fragments.append(frame.payload)
+ message = b''.join(self._received_fragments)
+ self._received_fragments = []
+ return message
+ else:
+ # Intermediate frame
+ self._received_fragments.append(frame.payload)
+ return None
+ else:
+ if self._received_fragments:
+ if frame.fin:
+ raise InvalidFrameException(
+ 'Received an unfragmented frame without '
+ 'terminating existing fragmentation')
+ else:
+ raise InvalidFrameException(
+ 'New fragmentation started without terminating '
+ 'existing fragmentation')
+
+ if frame.fin:
+ # Unfragmented frame
+
+ self._original_opcode = frame.opcode
+ return frame.payload
+ else:
+ # Start of fragmentation frame
+
+ if common.is_control_opcode(frame.opcode):
+ raise InvalidFrameException(
+ 'Control frames must not be fragmented')
+
+ self._original_opcode = frame.opcode
+ self._received_fragments.append(frame.payload)
+ return None
+
+ def _process_close_message(self, message):
+ """Processes close message.
+
+ Args:
+ message: close message.
+
+ Raises:
+ InvalidFrameException: when the message is invalid.
+ """
+
+ self._request.client_terminated = True
+
+ # Status code is optional. We can have status reason only if we
+ # have status code. Status reason can be empty string. So,
+ # allowed cases are
+ # - no application data: no code no reason
+ # - 2 octet of application data: has code but no reason
+ # - 3 or more octet of application data: both code and reason
+ if len(message) == 0:
+ self._logger.debug('Received close frame (empty body)')
+ self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED
+ elif len(message) == 1:
+ raise InvalidFrameException(
+ 'If a close frame has status code, the length of '
+ 'status code must be 2 octet')
+ elif len(message) >= 2:
+ self._request.ws_close_code = struct.unpack('!H', message[0:2])[0]
+ self._request.ws_close_reason = message[2:].decode(
+ 'utf-8', 'replace')
+ self._logger.debug('Received close frame (code=%d, reason=%r)',
+ self._request.ws_close_code,
+ self._request.ws_close_reason)
+
+ # As we've received a close frame, no more data is coming over the
+ # socket. We can now safely close the socket without worrying about
+ # RST sending.
+
+ if self._request.server_terminated:
+ self._logger.debug(
+ 'Received ack for server-initiated closing handshake')
+ return
+
+ self._logger.debug('Received client-initiated closing handshake')
+
+ code = common.STATUS_NORMAL_CLOSURE
+ reason = ''
+ if hasattr(self._request, '_dispatcher'):
+ dispatcher = self._request._dispatcher
+ code, reason = dispatcher.passive_closing_handshake(self._request)
+ if code is None and reason is not None and len(reason) > 0:
+ self._logger.warning(
+ 'Handler specified reason despite code being None')
+ reason = ''
+ if reason is None:
+ reason = ''
+ self._send_closing_handshake(code, reason)
+ self._logger.debug(
+ 'Acknowledged closing handshake initiated by the peer '
+ '(code=%r, reason=%r)', code, reason)
+
+ def _process_ping_message(self, message):
+ """Processes ping message.
+
+ Args:
+ message: ping message.
+ """
+
+ try:
+ handler = self._request.on_ping_handler
+ if handler:
+ handler(self._request, message)
+ return
+ except AttributeError:
+ pass
+ self._send_pong(message)
+
+ def _process_pong_message(self, message):
+ """Processes pong message.
+
+ Args:
+ message: pong message.
+ """
+
+ # TODO(tyoshino): Add ping timeout handling.
+
+ inflight_pings = deque()
+
+ while True:
+ try:
+ expected_body = self._ping_queue.popleft()
+ if expected_body == message:
+ # inflight_pings contains pings ignored by the
+ # other peer. Just forget them.
+ self._logger.debug(
+ 'Ping %r is acked (%d pings were ignored)',
+ expected_body, len(inflight_pings))
+ break
+ else:
+ inflight_pings.append(expected_body)
+ except IndexError:
+ # The received pong was unsolicited pong. Keep the
+ # ping queue as is.
+ self._ping_queue = inflight_pings
+ self._logger.debug('Received a unsolicited pong')
+ break
+
+ try:
+ handler = self._request.on_pong_handler
+ if handler:
+ handler(self._request, message)
+ except AttributeError:
+ pass
+
+ def receive_message(self):
+ """Receive a WebSocket frame and return its payload as a text in
+ unicode or a binary in str.
+
+ Returns:
+ payload data of the frame
+ - as unicode instance if received text frame
+ - as str instance if received binary frame
+ or None iff received closing handshake.
+ Raises:
+ BadOperationException: when called on a client-terminated
+ connection.
+ ConnectionTerminatedException: when read returns empty
+ string.
+ InvalidFrameException: when the frame contains invalid
+ data.
+ UnsupportedFrameException: when the received frame has
+ flags, opcode we cannot handle. You can ignore this
+ exception and continue receiving the next frame.
+ """
+
+ if self._request.client_terminated:
+ raise BadOperationException(
+ 'Requested receive_message after receiving a closing '
+ 'handshake')
+
+ while True:
+ # mp_conn.read will block if no bytes are available.
+
+ frame = self._receive_frame_as_frame_object()
+
+ # Check the constraint on the payload size for control frames
+ # before extension processes the frame.
+ # See also http://tools.ietf.org/html/rfc6455#section-5.5
+ if (common.is_control_opcode(frame.opcode)
+ and len(frame.payload) > 125):
+ raise InvalidFrameException(
+ 'Payload data size of control frames must be 125 bytes or '
+ 'less')
+
+ for frame_filter in self._options.incoming_frame_filters:
+ frame_filter.filter(frame)
+
+ if frame.rsv1 or frame.rsv2 or frame.rsv3:
+ raise UnsupportedFrameException(
+ 'Unsupported flag is set (rsv = %d%d%d)' %
+ (frame.rsv1, frame.rsv2, frame.rsv3))
+
+ message = self._get_message_from_frame(frame)
+ if message is None:
+ continue
+
+ for message_filter in self._options.incoming_message_filters:
+ message = message_filter.filter(message)
+
+ if self._original_opcode == common.OPCODE_TEXT:
+ # The WebSocket protocol section 4.4 specifies that invalid
+ # characters must be replaced with U+fffd REPLACEMENT
+ # CHARACTER.
+ try:
+ return message.decode('utf-8')
+ except UnicodeDecodeError as e:
+ raise InvalidUTF8Exception(e)
+ elif self._original_opcode == common.OPCODE_BINARY:
+ return message
+ elif self._original_opcode == common.OPCODE_CLOSE:
+ self._process_close_message(message)
+ return None
+ elif self._original_opcode == common.OPCODE_PING:
+ self._process_ping_message(message)
+ elif self._original_opcode == common.OPCODE_PONG:
+ self._process_pong_message(message)
+ else:
+ raise UnsupportedFrameException('Opcode %d is not supported' %
+ self._original_opcode)
+
+ def _send_closing_handshake(self, code, reason):
+ body = create_closing_handshake_body(code, reason)
+ frame = create_close_frame(
+ body,
+ mask=self._options.mask_send,
+ frame_filters=self._options.outgoing_frame_filters)
+
+ self._request.server_terminated = True
+
+ self._write(frame)
+
+ def close_connection(self,
+ code=common.STATUS_NORMAL_CLOSURE,
+ reason='',
+ wait_response=True):
+ """Closes a WebSocket connection. Note that this method blocks until
+ it receives acknowledgement to the closing handshake.
+
+ Args:
+ code: Status code for close frame. If code is None, a close
+ frame with empty body will be sent.
+ reason: string representing close reason.
+ wait_response: True when caller want to wait the response.
+ Raises:
+ BadOperationException: when reason is specified with code None
+ or reason is not an instance of both str and unicode.
+ """
+
+ if self._request.server_terminated:
+ self._logger.debug(
+ 'Requested close_connection but server is already terminated')
+ return
+
+ # When we receive a close frame, we call _process_close_message().
+ # _process_close_message() immediately acknowledges to the
+ # server-initiated closing handshake and sets server_terminated to
+ # True. So, here we can assume that we haven't received any close
+ # frame. We're initiating a closing handshake.
+
+ if code is None:
+ if reason is not None and len(reason) > 0:
+ raise BadOperationException(
+ 'close reason must not be specified if code is None')
+ reason = ''
+ else:
+ if not isinstance(reason, bytes) and not isinstance(
+ reason, six.text_type):
+ raise BadOperationException(
+ 'close reason must be an instance of bytes or unicode')
+
+ self._send_closing_handshake(code, reason)
+ self._logger.debug('Initiated closing handshake (code=%r, reason=%r)',
+ code, reason)
+
+ if (code == common.STATUS_GOING_AWAY
+ or code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
+ # It doesn't make sense to wait for a close frame if the reason is
+ # protocol error or that the server is going away. For some of
+ # other reasons, it might not make sense to wait for a close frame,
+ # but it's not clear, yet.
+ return
+
+ # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
+ # or until a server-defined timeout expires.
+ #
+ # For now, we expect receiving closing handshake right after sending
+ # out closing handshake.
+ message = self.receive_message()
+ if message is not None:
+ raise ConnectionTerminatedException(
+ 'Didn\'t receive valid ack for closing handshake')
+ # TODO: 3. close the WebSocket connection.
+ # note: mod_python Connection (mp_conn) doesn't have close method.
+
+ def send_ping(self, body, binary=False):
+ if not binary and isinstance(body, six.text_type):
+ body = body.encode('UTF-8')
+ frame = create_ping_frame(body, self._options.mask_send,
+ self._options.outgoing_frame_filters)
+ self._write(frame)
+
+ self._ping_queue.append(body)
+
+ def _send_pong(self, body):
+ frame = create_pong_frame(body, self._options.mask_send,
+ self._options.outgoing_frame_filters)
+ self._write(frame)
+
+ def get_last_received_opcode(self):
+ """Returns the opcode of the WebSocket message which the last received
+ frame belongs to. The return value is valid iff immediately after
+ receive_message call.
+ """
+
+ return self._original_opcode
+
+
+# vi:sts=4 sw=4 et