diff options
Diffstat (limited to 'third_party/rust/audioipc2/src/codec.rs')
-rw-r--r-- | third_party/rust/audioipc2/src/codec.rs | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2/src/codec.rs b/third_party/rust/audioipc2/src/codec.rs new file mode 100644 index 0000000000..2c61faa6ea --- /dev/null +++ b/third_party/rust/audioipc2/src/codec.rs @@ -0,0 +1,203 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers. + +// The assert in LengthDelimitedCodec::decode triggers this clippy warning but +// requires upgrading the workspace to Rust 2021 to resolve. +// This should be fixed in Rust 1.68, after which the following `allow` can be deleted. +#![allow(clippy::uninlined_format_args)] + +use bincode::{self, Options}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Buf, BufMut, BytesMut}; +use serde::de::DeserializeOwned; +use serde::ser::Serialize; +use std::convert::TryInto; +use std::fmt::Debug; +use std::io; +use std::marker::PhantomData; +use std::mem::size_of; + +//////////////////////////////////////////////////////////////////////////////// +// Split buffer into size delimited frames - This appears more complicated than +// might be necessary due to handling the possibility of messages being split +// across reads. + +pub trait Codec { + /// The type of items to be encoded into byte buffer + type In; + + /// The type of items to be returned by decoding from byte buffer + type Out; + + /// Attempts to decode a frame from the provided buffer of bytes. + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the I/O. + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> { + match self.decode(buf)? { + Some(frame) => Ok(frame), + None => Err(io::Error::new( + io::ErrorKind::Other, + "bytes remaining on stream", + )), + } + } + + /// Encodes a frame into the buffer provided. + fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>; +} + +/// Codec based upon bincode serialization +/// +/// Messages that have been serialized using bincode are prefixed with +/// the length of the message to aid in deserialization, so that it's +/// known if enough data has been received to decode a complete +/// message. +pub struct LengthDelimitedCodec<In, Out> { + state: State, + encode_buf: Vec<u8>, + __in: PhantomData<In>, + __out: PhantomData<Out>, +} + +enum State { + Length, + Data(u32), +} + +const MAX_MESSAGE_LEN: u32 = 1024 * 1024; +const MAGIC: u64 = 0xa4d1_019c_c910_1d4a; +const HEADER_LEN: usize = size_of::<u32>() + size_of::<u64>(); + +impl<In, Out> Default for LengthDelimitedCodec<In, Out> { + fn default() -> Self { + Self { + state: State::Length, + encode_buf: Vec::with_capacity(crate::ipccore::IPC_CLIENT_BUFFER_SIZE), + __in: PhantomData, + __out: PhantomData, + } + } +} + +impl<In, Out> LengthDelimitedCodec<In, Out> { + // Lengths are encoded as little endian u32 + fn decode_length(buf: &mut BytesMut) -> Option<u32> { + if buf.len() < HEADER_LEN { + // Not enough data + return None; + } + + let magic = LittleEndian::read_u64(&buf[0..8]); + assert_eq!(magic, MAGIC); + + // Consume the length field + let n = LittleEndian::read_u32(&buf[8..12]); + buf.advance(HEADER_LEN); + Some(n) + } + + fn decode_data(buf: &mut BytesMut, n: u32) -> io::Result<Option<Out>> + where + Out: DeserializeOwned + Debug, + { + let n = n.try_into().unwrap(); + + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if buf.len() < n { + return Ok(None); + } + + trace!("Attempting to decode"); + let msg = bincode::options() + .with_limit(MAX_MESSAGE_LEN as u64) + .deserialize::<Out>(&buf[..n]) + .map_err(|e| match *e { + bincode::ErrorKind::Io(e) => e, + _ => io::Error::new(io::ErrorKind::Other, *e), + })?; + buf.advance(n); + + trace!("... Decoded {:?}", msg); + Ok(Some(msg)) + } +} + +impl<In, Out> Codec for LengthDelimitedCodec<In, Out> +where + In: Serialize + Debug, + Out: DeserializeOwned + Debug, +{ + type In = In; + type Out = Out; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> { + let n = match self.state { + State::Length => { + match Self::decode_length(buf) { + Some(n) => { + assert!( + n <= MAX_MESSAGE_LEN, + "assertion failed: {} <= {}", + n, + MAX_MESSAGE_LEN + ); + self.state = State::Data(n); + + // Ensure that the buffer has enough space to read the + // incoming payload + buf.reserve(n.try_into().unwrap()); + + n + } + None => return Ok(None), + } + } + State::Data(n) => n, + }; + + match Self::decode_data(buf, n)? { + Some(data) => { + // Update the decode state + self.state = State::Length; + + // Make sure the buffer has enough space to read the next length header. + buf.reserve(HEADER_LEN); + + Ok(Some(data)) + } + None => Ok(None), + } + } + + fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> { + trace!("Attempting to encode"); + + self.encode_buf.clear(); + if let Err(e) = bincode::options() + .with_limit(MAX_MESSAGE_LEN as u64) + .serialize_into::<_, Self::In>(&mut self.encode_buf, &item) + { + trace!("message encode failed: {:?}", *e); + match *e { + bincode::ErrorKind::Io(e) => return Err(e), + _ => return Err(io::Error::new(io::ErrorKind::Other, *e)), + } + } + + let encoded_len = self.encode_buf.len(); + assert!(encoded_len <= MAX_MESSAGE_LEN as usize); + buf.reserve(encoded_len + HEADER_LEN); + buf.put_u64_le(MAGIC); + buf.put_u32_le(encoded_len.try_into().unwrap()); + buf.extend_from_slice(&self.encode_buf); + + Ok(()) + } +} |