summaryrefslogtreecommitdiffstats
path: root/testing/mochitest/pywebsocket3/mod_pywebsocket/stream.py
blob: 99095224ccd801c7edee99739c6d7dc316ac93cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
# Copyright 2011, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""This file provides classes and helper functions for parsing/building frames
of the WebSocket protocol (RFC 6455).

Specification:
http://tools.ietf.org/html/rfc6455
"""

from __future__ import absolute_import, division

from collections import deque
import logging
import os
import struct
import time
import socket
import six

from mod_pywebsocket import common
from mod_pywebsocket import util
from mod_pywebsocket._stream_exceptions import BadOperationException
from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException
from mod_pywebsocket._stream_exceptions import InvalidFrameException
from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception
from mod_pywebsocket._stream_exceptions import UnsupportedFrameException

_NOOP_MASKER = util.NoopMasker()


class Frame(object):
    def __init__(self,
                 fin=1,
                 rsv1=0,
                 rsv2=0,
                 rsv3=0,
                 opcode=None,
                 payload=b''):
        self.fin = fin
        self.rsv1 = rsv1
        self.rsv2 = rsv2
        self.rsv3 = rsv3
        self.opcode = opcode
        self.payload = payload


# Helper functions made public to be used for writing unittests for WebSocket
# clients.


def create_length_header(length, mask):
    """Creates a length header.

    Args:
        length: Frame length. Must be less than 2^63.
        mask: Mask bit. Must be boolean.

    Raises:
        ValueError: when bad data is given.
    """

    if mask:
        mask_bit = 1 << 7
    else:
        mask_bit = 0

    if length < 0:
        raise ValueError('length must be non negative integer')
    elif length <= 125:
        return util.pack_byte(mask_bit | length)
    elif length < (1 << 16):
        return util.pack_byte(mask_bit | 126) + struct.pack('!H', length)
    elif length < (1 << 63):
        return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length)
    else:
        raise ValueError('Payload is too big for one frame')


def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
    """Creates a frame header.

    Raises:
        Exception: when bad data is given.
    """

    if opcode < 0 or 0xf < opcode:
        raise ValueError('Opcode out of range')

    if payload_length < 0 or (1 << 63) <= payload_length:
        raise ValueError('payload_length out of range')

    if (fin | rsv1 | rsv2 | rsv3) & ~1:
        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')

    header = b''

    first_byte = ((fin << 7)
                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
                  | opcode)
    header += util.pack_byte(first_byte)
    header += create_length_header(payload_length, mask)

    return header


def _build_frame(header, body, mask):
    if not mask:
        return header + body

    masking_nonce = os.urandom(4)
    masker = util.RepeatedXorMasker(masking_nonce)

    return header + masking_nonce + masker.mask(body)


def _filter_and_format_frame_object(frame, mask, frame_filters):
    for frame_filter in frame_filters:
        frame_filter.filter(frame)

    header = create_header(frame.opcode, len(frame.payload), frame.fin,
                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
    return _build_frame(header, frame.payload, mask)


def create_binary_frame(message,
                        opcode=common.OPCODE_BINARY,
                        fin=1,
                        mask=False,
                        frame_filters=[]):
    """Creates a simple binary frame with no extension, reserved bit."""

    frame = Frame(fin=fin, opcode=opcode, payload=message)
    return _filter_and_format_frame_object(frame, mask, frame_filters)


def create_text_frame(message,
                      opcode=common.OPCODE_TEXT,
                      fin=1,
                      mask=False,
                      frame_filters=[]):
    """Creates a simple text frame with no extension, reserved bit."""

    encoded_message = message.encode('utf-8')
    return create_binary_frame(encoded_message, opcode, fin, mask,
                               frame_filters)


def parse_frame(receive_bytes,
                logger=None,
                ws_version=common.VERSION_HYBI_LATEST,
                unmask_receive=True):
    """Parses a frame. Returns a tuple containing each header field and
    payload.

    Args:
        receive_bytes: a function that reads frame data from a stream or
            something similar. The function takes length of the bytes to be
            read. The function must raise ConnectionTerminatedException if
            there is not enough data to be read.
        logger: a logging object.
        ws_version: the version of WebSocket protocol.
        unmask_receive: unmask received frames. When received unmasked
            frame, raises InvalidFrameException.

    Raises:
        ConnectionTerminatedException: when receive_bytes raises it.
        InvalidFrameException: when the frame contains invalid data.
    """

    if not logger:
        logger = logging.getLogger()

    logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')

    first_byte = ord(receive_bytes(1))
    fin = (first_byte >> 7) & 1
    rsv1 = (first_byte >> 6) & 1
    rsv2 = (first_byte >> 5) & 1
    rsv3 = (first_byte >> 4) & 1
    opcode = first_byte & 0xf

    second_byte = ord(receive_bytes(1))
    mask = (second_byte >> 7) & 1
    payload_length = second_byte & 0x7f

    logger.log(
        common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
        'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask,
        payload_length)

    if (mask == 1) != unmask_receive:
        raise InvalidFrameException(
            'Mask bit on the received frame did\'nt match masking '
            'configuration for received frames')

    # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
    # into the 8-octet extended payload length field (or 0x0-0xFD in
    # 2-octet field).
    valid_length_encoding = True
    length_encoding_bytes = 1
    if payload_length == 127:
        logger.log(common.LOGLEVEL_FINE,
                   'Receive 8-octet extended payload length')

        extended_payload_length = receive_bytes(8)
        payload_length = struct.unpack('!Q', extended_payload_length)[0]
        if payload_length > 0x7FFFFFFFFFFFFFFF:
            raise InvalidFrameException('Extended payload length >= 2^63')
        if ws_version >= 13 and payload_length < 0x10000:
            valid_length_encoding = False
            length_encoding_bytes = 8

        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
                   payload_length)
    elif payload_length == 126:
        logger.log(common.LOGLEVEL_FINE,
                   'Receive 2-octet extended payload length')

        extended_payload_length = receive_bytes(2)
        payload_length = struct.unpack('!H', extended_payload_length)[0]
        if ws_version >= 13 and payload_length < 126:
            valid_length_encoding = False
            length_encoding_bytes = 2

        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
                   payload_length)

    if not valid_length_encoding:
        logger.warning(
            'Payload length is not encoded using the minimal number of '
            'bytes (%d is encoded using %d bytes)', payload_length,
            length_encoding_bytes)

    if mask == 1:
        logger.log(common.LOGLEVEL_FINE, 'Receive mask')

        masking_nonce = receive_bytes(4)
        masker = util.RepeatedXorMasker(masking_nonce)

        logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
    else:
        masker = _NOOP_MASKER

    logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
    if logger.isEnabledFor(common.LOGLEVEL_FINE):
        receive_start = time.time()

    raw_payload_bytes = receive_bytes(payload_length)

    if logger.isEnabledFor(common.LOGLEVEL_FINE):
        # pylint --py3k W1619
        logger.log(
            common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s',
            payload_length / (time.time() - receive_start) / 1000 / 1000)
    logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')

    if logger.isEnabledFor(common.LOGLEVEL_FINE):
        unmask_start = time.time()

    unmasked_bytes = masker.mask(raw_payload_bytes)

    if logger.isEnabledFor(common.LOGLEVEL_FINE):
        # pylint --py3k W1619
        logger.log(common.LOGLEVEL_FINE,
                   'Done unmasking payload data at %s MB/s',
                   payload_length / (time.time() - unmask_start) / 1000 / 1000)

    return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3


class FragmentedFrameBuilder(object):
    """A stateful class to send a message as fragments."""
    def __init__(self, mask, frame_filters=[], encode_utf8=True):
        """Constructs an instance."""

        self._mask = mask
        self._frame_filters = frame_filters
        # This is for skipping UTF-8 encoding when building text type frames
        # from compressed data.
        self._encode_utf8 = encode_utf8

        self._started = False

        # Hold opcode of the first frame in messages to verify types of other
        # frames in the message are all the same.
        self._opcode = common.OPCODE_TEXT

    def build(self, payload_data, end, binary):
        if binary:
            frame_type = common.OPCODE_BINARY
        else:
            frame_type = common.OPCODE_TEXT
        if self._started:
            if self._opcode != frame_type:
                raise ValueError('Message types are different in frames for '
                                 'the same message')
            opcode = common.OPCODE_CONTINUATION
        else:
            opcode = frame_type
            self._opcode = frame_type

        if end:
            self._started = False
            fin = 1
        else:
            self._started = True
            fin = 0

        if binary or not self._encode_utf8:
            return create_binary_frame(payload_data, opcode, fin, self._mask,
                                       self._frame_filters)
        else:
            return create_text_frame(payload_data, opcode, fin, self._mask,
                                     self._frame_filters)


def _create_control_frame(opcode, body, mask, frame_filters):
    frame = Frame(opcode=opcode, payload=body)

    for frame_filter in frame_filters:
        frame_filter.filter(frame)

    if len(frame.payload) > 125:
        raise BadOperationException(
            'Payload data size of control frames must be 125 bytes or less')

    header = create_header(frame.opcode, len(frame.payload), frame.fin,
                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
    return _build_frame(header, frame.payload, mask)


def create_ping_frame(body, mask=False, frame_filters=[]):
    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)


def create_pong_frame(body, mask=False, frame_filters=[]):
    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)


def create_close_frame(body, mask=False, frame_filters=[]):
    return _create_control_frame(common.OPCODE_CLOSE, body, mask,
                                 frame_filters)


def create_closing_handshake_body(code, reason):
    body = b''
    if code is not None:
        if (code > common.STATUS_USER_PRIVATE_MAX
                or code < common.STATUS_NORMAL_CLOSURE):
            raise BadOperationException('Status code is out of range')
        if (code == common.STATUS_NO_STATUS_RECEIVED
                or code == common.STATUS_ABNORMAL_CLOSURE
                or code == common.STATUS_TLS_HANDSHAKE):
            raise BadOperationException('Status code is reserved pseudo '
                                        'code')
        encoded_reason = reason.encode('utf-8')
        body = struct.pack('!H', code) + encoded_reason
    return body


class StreamOptions(object):
    """Holds option values to configure Stream objects."""
    def __init__(self):
        """Constructs StreamOptions."""

        # Filters applied to frames.
        self.outgoing_frame_filters = []
        self.incoming_frame_filters = []

        # Filters applied to messages. Control frames are not affected by them.
        self.outgoing_message_filters = []
        self.incoming_message_filters = []

        self.encode_text_message_to_utf8 = True
        self.mask_send = False
        self.unmask_receive = True


class Stream(object):
    """A class for parsing/building frames of the WebSocket protocol
    (RFC 6455).
    """
    def __init__(self, request, options):
        """Constructs an instance.

        Args:
            request: mod_python request.
        """

        self._logger = util.get_class_logger(self)

        self._options = options
        self._request = request

        self._request.client_terminated = False
        self._request.server_terminated = False

        # Holds body of received fragments.
        self._received_fragments = []
        # Holds the opcode of the first fragment.
        self._original_opcode = None

        self._writer = FragmentedFrameBuilder(
            self._options.mask_send, self._options.outgoing_frame_filters,
            self._options.encode_text_message_to_utf8)

        self._ping_queue = deque()

    def _read(self, length):
        """Reads length bytes from connection. In case we catch any exception,
        prepends remote address to the exception message and raise again.

        Raises:
            ConnectionTerminatedException: when read returns empty string.
        """

        try:
            read_bytes = self._request.connection.read(length)
            if not read_bytes:
                raise ConnectionTerminatedException(
                    'Receiving %d byte failed. Peer (%r) closed connection' %
                    (length, (self._request.connection.remote_addr, )))
            return read_bytes
        except IOError as e:
            # Also catch an IOError because mod_python throws it.
            raise ConnectionTerminatedException(
                'Receiving %d byte failed. IOError (%s) occurred' %
                (length, e))

    def _write(self, bytes_to_write):
        """Writes given bytes to connection. In case we catch any exception,
        prepends remote address to the exception message and raise again.
        """

        try:
            self._request.connection.write(bytes_to_write)
        except Exception as e:
            util.prepend_message_to_exception(
                'Failed to send message to %r: ' %
                (self._request.connection.remote_addr, ), e)
            raise

    def receive_bytes(self, length):
        """Receives multiple bytes. Retries read when we couldn't receive the
        specified amount. This method returns byte strings.

        Raises:
            ConnectionTerminatedException: when read returns empty string.
        """

        read_bytes = []
        while length > 0:
            new_read_bytes = self._read(length)
            read_bytes.append(new_read_bytes)
            length -= len(new_read_bytes)
        return b''.join(read_bytes)

    def _read_until(self, delim_char):
        """Reads bytes until we encounter delim_char. The result will not
        contain delim_char.

        Raises:
            ConnectionTerminatedException: when read returns empty string.
        """

        read_bytes = []
        while True:
            ch = self._read(1)
            if ch == delim_char:
                break
            read_bytes.append(ch)
        return b''.join(read_bytes)

    def _receive_frame(self):
        """Receives a frame and return data in the frame as a tuple containing
        each header field and payload separately.

        Raises:
            ConnectionTerminatedException: when read returns empty
                string.
            InvalidFrameException: when the frame contains invalid data.
        """
        def _receive_bytes(length):
            return self.receive_bytes(length)

        return parse_frame(receive_bytes=_receive_bytes,
                           logger=self._logger,
                           ws_version=self._request.ws_version,
                           unmask_receive=self._options.unmask_receive)

    def _receive_frame_as_frame_object(self):
        opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()

        return Frame(fin=fin,
                     rsv1=rsv1,
                     rsv2=rsv2,
                     rsv3=rsv3,
                     opcode=opcode,
                     payload=unmasked_bytes)

    def receive_filtered_frame(self):
        """Receives a frame and applies frame filters and message filters.
        The frame to be received must satisfy following conditions:
        - The frame is not fragmented.
        - The opcode of the frame is TEXT or BINARY.

        DO NOT USE this method except for testing purpose.
        """

        frame = self._receive_frame_as_frame_object()
        if not frame.fin:
            raise InvalidFrameException(
                'Segmented frames must not be received via '
                'receive_filtered_frame()')
        if (frame.opcode != common.OPCODE_TEXT
                and frame.opcode != common.OPCODE_BINARY):
            raise InvalidFrameException(
                'Control frames must not be received via '
                'receive_filtered_frame()')

        for frame_filter in self._options.incoming_frame_filters:
            frame_filter.filter(frame)
        for message_filter in self._options.incoming_message_filters:
            frame.payload = message_filter.filter(frame.payload)
        return frame

    def send_message(self, message, end=True, binary=False):
        """Send message.

        Args:
            message: text in unicode or binary in str to send.
            binary: send message as binary frame.

        Raises:
            BadOperationException: when called on a server-terminated
                connection or called with inconsistent message type or
                binary parameter.
        """

        if self._request.server_terminated:
            raise BadOperationException(
                'Requested send_message after sending out a closing handshake')

        if binary and isinstance(message, six.text_type):
            raise BadOperationException(
                'Message for binary frame must not be instance of Unicode')

        for message_filter in self._options.outgoing_message_filters:
            message = message_filter.filter(message, end, binary)

        try:
            # Set this to any positive integer to limit maximum size of data in
            # payload data of each frame.
            MAX_PAYLOAD_DATA_SIZE = -1

            if MAX_PAYLOAD_DATA_SIZE <= 0:
                self._write(self._writer.build(message, end, binary))
                return

            bytes_written = 0
            while True:
                end_for_this_frame = end
                bytes_to_write = len(message) - bytes_written
                if (MAX_PAYLOAD_DATA_SIZE > 0
                        and bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
                    end_for_this_frame = False
                    bytes_to_write = MAX_PAYLOAD_DATA_SIZE

                frame = self._writer.build(
                    message[bytes_written:bytes_written + bytes_to_write],
                    end_for_this_frame, binary)
                self._write(frame)

                bytes_written += bytes_to_write

                # This if must be placed here (the end of while block) so that
                # at least one frame is sent.
                if len(message) <= bytes_written:
                    break
        except ValueError as e:
            raise BadOperationException(e)

    def _get_message_from_frame(self, frame):
        """Gets a message from frame. If the message is composed of fragmented
        frames and the frame is not the last fragmented frame, this method
        returns None. The whole message will be returned when the last
        fragmented frame is passed to this method.

        Raises:
            InvalidFrameException: when the frame doesn't match defragmentation
                context, or the frame contains invalid data.
        """

        if frame.opcode == common.OPCODE_CONTINUATION:
            if not self._received_fragments:
                if frame.fin:
                    raise InvalidFrameException(
                        'Received a termination frame but fragmentation '
                        'not started')
                else:
                    raise InvalidFrameException(
                        'Received an intermediate frame but '
                        'fragmentation not started')

            if frame.fin:
                # End of fragmentation frame
                self._received_fragments.append(frame.payload)
                message = b''.join(self._received_fragments)
                self._received_fragments = []
                return message
            else:
                # Intermediate frame
                self._received_fragments.append(frame.payload)
                return None
        else:
            if self._received_fragments:
                if frame.fin:
                    raise InvalidFrameException(
                        'Received an unfragmented frame without '
                        'terminating existing fragmentation')
                else:
                    raise InvalidFrameException(
                        'New fragmentation started without terminating '
                        'existing fragmentation')

            if frame.fin:
                # Unfragmented frame

                self._original_opcode = frame.opcode
                return frame.payload
            else:
                # Start of fragmentation frame

                if common.is_control_opcode(frame.opcode):
                    raise InvalidFrameException(
                        'Control frames must not be fragmented')

                self._original_opcode = frame.opcode
                self._received_fragments.append(frame.payload)
                return None

    def _process_close_message(self, message):
        """Processes close message.

        Args:
            message: close message.

        Raises:
            InvalidFrameException: when the message is invalid.
        """

        self._request.client_terminated = True

        # Status code is optional. We can have status reason only if we
        # have status code. Status reason can be empty string. So,
        # allowed cases are
        # - no application data: no code no reason
        # - 2 octet of application data: has code but no reason
        # - 3 or more octet of application data: both code and reason
        if len(message) == 0:
            self._logger.debug('Received close frame (empty body)')
            self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED
        elif len(message) == 1:
            raise InvalidFrameException(
                'If a close frame has status code, the length of '
                'status code must be 2 octet')
        elif len(message) >= 2:
            self._request.ws_close_code = struct.unpack('!H', message[0:2])[0]
            self._request.ws_close_reason = message[2:].decode(
                'utf-8', 'replace')
            self._logger.debug('Received close frame (code=%d, reason=%r)',
                               self._request.ws_close_code,
                               self._request.ws_close_reason)

        # As we've received a close frame, no more data is coming over the
        # socket. We can now safely close the socket without worrying about
        # RST sending.

        if self._request.server_terminated:
            self._logger.debug(
                'Received ack for server-initiated closing handshake')
            return

        self._logger.debug('Received client-initiated closing handshake')

        code = common.STATUS_NORMAL_CLOSURE
        reason = ''
        if hasattr(self._request, '_dispatcher'):
            dispatcher = self._request._dispatcher
            code, reason = dispatcher.passive_closing_handshake(self._request)
            if code is None and reason is not None and len(reason) > 0:
                self._logger.warning(
                    'Handler specified reason despite code being None')
                reason = ''
            if reason is None:
                reason = ''
        self._send_closing_handshake(code, reason)
        self._logger.debug(
            'Acknowledged closing handshake initiated by the peer '
            '(code=%r, reason=%r)', code, reason)

    def _process_ping_message(self, message):
        """Processes ping message.

        Args:
            message: ping message.
        """

        try:
            handler = self._request.on_ping_handler
            if handler:
                handler(self._request, message)
                return
        except AttributeError:
            pass
        self._send_pong(message)

    def _process_pong_message(self, message):
        """Processes pong message.

        Args:
            message: pong message.
        """

        # TODO(tyoshino): Add ping timeout handling.

        inflight_pings = deque()

        while True:
            try:
                expected_body = self._ping_queue.popleft()
                if expected_body == message:
                    # inflight_pings contains pings ignored by the
                    # other peer. Just forget them.
                    self._logger.debug(
                        'Ping %r is acked (%d pings were ignored)',
                        expected_body, len(inflight_pings))
                    break
                else:
                    inflight_pings.append(expected_body)
            except IndexError:
                # The received pong was unsolicited pong. Keep the
                # ping queue as is.
                self._ping_queue = inflight_pings
                self._logger.debug('Received a unsolicited pong')
                break

        try:
            handler = self._request.on_pong_handler
            if handler:
                handler(self._request, message)
        except AttributeError:
            pass

    def receive_message(self):
        """Receive a WebSocket frame and return its payload as a text in
        unicode or a binary in str.

        Returns:
            payload data of the frame
            - as unicode instance if received text frame
            - as str instance if received binary frame
            or None iff received closing handshake.
        Raises:
            BadOperationException: when called on a client-terminated
                connection.
            ConnectionTerminatedException: when read returns empty
                string.
            InvalidFrameException: when the frame contains invalid
                data.
            UnsupportedFrameException: when the received frame has
                flags, opcode we cannot handle. You can ignore this
                exception and continue receiving the next frame.
        """

        if self._request.client_terminated:
            raise BadOperationException(
                'Requested receive_message after receiving a closing '
                'handshake')

        while True:
            # mp_conn.read will block if no bytes are available.

            frame = self._receive_frame_as_frame_object()

            # Check the constraint on the payload size for control frames
            # before extension processes the frame.
            # See also http://tools.ietf.org/html/rfc6455#section-5.5
            if (common.is_control_opcode(frame.opcode)
                    and len(frame.payload) > 125):
                raise InvalidFrameException(
                    'Payload data size of control frames must be 125 bytes or '
                    'less')

            for frame_filter in self._options.incoming_frame_filters:
                frame_filter.filter(frame)

            if frame.rsv1 or frame.rsv2 or frame.rsv3:
                raise UnsupportedFrameException(
                    'Unsupported flag is set (rsv = %d%d%d)' %
                    (frame.rsv1, frame.rsv2, frame.rsv3))

            message = self._get_message_from_frame(frame)
            if message is None:
                continue

            for message_filter in self._options.incoming_message_filters:
                message = message_filter.filter(message)

            if self._original_opcode == common.OPCODE_TEXT:
                # The WebSocket protocol section 4.4 specifies that invalid
                # characters must be replaced with U+fffd REPLACEMENT
                # CHARACTER.
                try:
                    return message.decode('utf-8')
                except UnicodeDecodeError as e:
                    raise InvalidUTF8Exception(e)
            elif self._original_opcode == common.OPCODE_BINARY:
                return message
            elif self._original_opcode == common.OPCODE_CLOSE:
                self._process_close_message(message)
                return None
            elif self._original_opcode == common.OPCODE_PING:
                self._process_ping_message(message)
            elif self._original_opcode == common.OPCODE_PONG:
                self._process_pong_message(message)
            else:
                raise UnsupportedFrameException('Opcode %d is not supported' %
                                                self._original_opcode)

    def _send_closing_handshake(self, code, reason):
        body = create_closing_handshake_body(code, reason)
        frame = create_close_frame(
            body,
            mask=self._options.mask_send,
            frame_filters=self._options.outgoing_frame_filters)

        self._request.server_terminated = True

        self._write(frame)

    def close_connection(self,
                         code=common.STATUS_NORMAL_CLOSURE,
                         reason='',
                         wait_response=True):
        """Closes a WebSocket connection. Note that this method blocks until
        it receives acknowledgement to the closing handshake.

        Args:
            code: Status code for close frame. If code is None, a close
                frame with empty body will be sent.
            reason: string representing close reason.
            wait_response: True when caller want to wait the response.
        Raises:
            BadOperationException: when reason is specified with code None
            or reason is not an instance of both str and unicode.
        """

        if self._request.server_terminated:
            self._logger.debug(
                'Requested close_connection but server is already terminated')
            return

        # When we receive a close frame, we call _process_close_message().
        # _process_close_message() immediately acknowledges to the
        # server-initiated closing handshake and sets server_terminated to
        # True. So, here we can assume that we haven't received any close
        # frame. We're initiating a closing handshake.

        if code is None:
            if reason is not None and len(reason) > 0:
                raise BadOperationException(
                    'close reason must not be specified if code is None')
            reason = ''
        else:
            if not isinstance(reason, bytes) and not isinstance(
                    reason, six.text_type):
                raise BadOperationException(
                    'close reason must be an instance of bytes or unicode')

        self._send_closing_handshake(code, reason)
        self._logger.debug('Initiated closing handshake (code=%r, reason=%r)',
                           code, reason)

        if (code == common.STATUS_GOING_AWAY
                or code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
            # It doesn't make sense to wait for a close frame if the reason is
            # protocol error or that the server is going away. For some of
            # other reasons, it might not make sense to wait for a close frame,
            # but it's not clear, yet.
            return

        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
        # or until a server-defined timeout expires.
        #
        # For now, we expect receiving closing handshake right after sending
        # out closing handshake.
        message = self.receive_message()
        if message is not None:
            raise ConnectionTerminatedException(
                'Didn\'t receive valid ack for closing handshake')
        # TODO: 3. close the WebSocket connection.
        # note: mod_python Connection (mp_conn) doesn't have close method.

    def send_ping(self, body, binary=False):
        if not binary and isinstance(body, six.text_type):
            body = body.encode('UTF-8')
        frame = create_ping_frame(body, self._options.mask_send,
                                  self._options.outgoing_frame_filters)
        self._write(frame)

        self._ping_queue.append(body)

    def _send_pong(self, body):
        frame = create_pong_frame(body, self._options.mask_send,
                                  self._options.outgoing_frame_filters)
        self._write(frame)

    def get_last_received_opcode(self):
        """Returns the opcode of the WebSocket message which the last received
        frame belongs to. The return value is valid iff immediately after
        receive_message call.
        """

        return self._original_opcode


# vi:sts=4 sw=4 et