summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc2/src/codec.rs
blob: eb172fbc51cae3514fd56f2e6fe9f05fa691777a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.

// The assert in LengthDelimitedCodec::decode triggers this clippy warning but
// requires upgrading the workspace to Rust 2021 to resolve.
// This should be fixed in Rust 1.68, after which the following `allow` can be deleted.
#![allow(clippy::uninlined_format_args)]

use bincode::Options;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, BytesMut};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use std::convert::TryInto;
use std::fmt::Debug;
use std::io;
use std::marker::PhantomData;
use std::mem::size_of;

////////////////////////////////////////////////////////////////////////////////
// Split buffer into size delimited frames - This appears more complicated than
// might be necessary due to handling the possibility of messages being split
// across reads.

pub trait Codec {
    /// The type of items to be encoded into byte buffer
    type In;

    /// The type of items to be returned by decoding from byte buffer
    type Out;

    /// Attempts to decode a frame from the provided buffer of bytes.
    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>>;

    /// A default method available to be called when there are no more bytes
    /// available to be read from the I/O.
    fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
        match self.decode(buf)? {
            Some(frame) => Ok(frame),
            None => Err(io::Error::new(
                io::ErrorKind::Other,
                "bytes remaining on stream",
            )),
        }
    }

    /// Encodes a frame into the buffer provided.
    fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>;
}

/// Codec based upon bincode serialization
///
/// Messages that have been serialized using bincode are prefixed with
/// the length of the message to aid in deserialization, so that it's
/// known if enough data has been received to decode a complete
/// message.
pub struct LengthDelimitedCodec<In, Out> {
    state: State,
    encode_buf: Vec<u8>,
    __in: PhantomData<In>,
    __out: PhantomData<Out>,
}

enum State {
    Length,
    Data(u32),
}

const MAX_MESSAGE_LEN: u32 = 1024 * 1024;
const MAGIC: u64 = 0xa4d1_019c_c910_1d4a;
const HEADER_LEN: usize = size_of::<u32>() + size_of::<u64>();

impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
    fn default() -> Self {
        Self {
            state: State::Length,
            encode_buf: Vec::with_capacity(crate::ipccore::IPC_CLIENT_BUFFER_SIZE),
            __in: PhantomData,
            __out: PhantomData,
        }
    }
}

impl<In, Out> LengthDelimitedCodec<In, Out> {
    // Lengths are encoded as little endian u32
    fn decode_length(buf: &mut BytesMut) -> Option<u32> {
        if buf.len() < HEADER_LEN {
            // Not enough data
            return None;
        }

        let magic = LittleEndian::read_u64(&buf[0..8]);
        assert_eq!(magic, MAGIC);

        // Consume the length field
        let n = LittleEndian::read_u32(&buf[8..12]);
        buf.advance(HEADER_LEN);
        Some(n)
    }

    fn decode_data(buf: &mut BytesMut, n: u32) -> io::Result<Option<Out>>
    where
        Out: DeserializeOwned + Debug,
    {
        let n = n.try_into().unwrap();

        // At this point, the buffer has already had the required capacity
        // reserved. All there is to do is read.
        if buf.len() < n {
            return Ok(None);
        }

        trace!("Attempting to decode");
        let msg = bincode::options()
            .with_limit(MAX_MESSAGE_LEN as u64)
            .deserialize::<Out>(&buf[..n])
            .map_err(|e| match *e {
                bincode::ErrorKind::Io(e) => e,
                _ => io::Error::new(io::ErrorKind::Other, *e),
            })?;
        buf.advance(n);

        trace!("... Decoded {:?}", msg);
        Ok(Some(msg))
    }
}

impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
where
    In: Serialize + Debug,
    Out: DeserializeOwned + Debug,
{
    type In = In;
    type Out = Out;

    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
        let n = match self.state {
            State::Length => {
                match Self::decode_length(buf) {
                    Some(n) => {
                        assert!(
                            n <= MAX_MESSAGE_LEN,
                            "assertion failed: {} <= {}",
                            n,
                            MAX_MESSAGE_LEN
                        );
                        self.state = State::Data(n);

                        // Ensure that the buffer has enough space to read the
                        // incoming payload
                        buf.reserve(n.try_into().unwrap());

                        n
                    }
                    None => return Ok(None),
                }
            }
            State::Data(n) => n,
        };

        match Self::decode_data(buf, n)? {
            Some(data) => {
                // Update the decode state
                self.state = State::Length;

                // Make sure the buffer has enough space to read the next length header.
                buf.reserve(HEADER_LEN);

                Ok(Some(data))
            }
            None => Ok(None),
        }
    }

    fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> {
        trace!("Attempting to encode");

        self.encode_buf.clear();
        if let Err(e) = bincode::options()
            .with_limit(MAX_MESSAGE_LEN as u64)
            .serialize_into::<_, Self::In>(&mut self.encode_buf, &item)
        {
            trace!("message encode failed: {:?}", *e);
            match *e {
                bincode::ErrorKind::Io(e) => return Err(e),
                _ => return Err(io::Error::new(io::ErrorKind::Other, *e)),
            }
        }

        let encoded_len = self.encode_buf.len();
        assert!(encoded_len <= MAX_MESSAGE_LEN as usize);
        buf.reserve(encoded_len + HEADER_LEN);
        buf.put_u64_le(MAGIC);
        buf.put_u32_le(encoded_len.try_into().unwrap());
        buf.extend_from_slice(&self.encode_buf);

        Ok(())
    }
}