summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-qpack/src/encoder.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-qpack/src/encoder.rs')
-rw-r--r--third_party/rust/neqo-qpack/src/encoder.rs1645
1 files changed, 1645 insertions, 0 deletions
diff --git a/third_party/rust/neqo-qpack/src/encoder.rs b/third_party/rust/neqo-qpack/src/encoder.rs
new file mode 100644
index 0000000000..a5ebd01666
--- /dev/null
+++ b/third_party/rust/neqo-qpack/src/encoder.rs
@@ -0,0 +1,1645 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use crate::decoder_instructions::{DecoderInstruction, DecoderInstructionReader};
+use crate::encoder_instructions::EncoderInstruction;
+use crate::header_block::HeaderEncoder;
+use crate::qlog;
+use crate::qpack_send_buf::QpackData;
+use crate::reader::ReceiverConnWrapper;
+use crate::stats::Stats;
+use crate::table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE};
+use crate::{Error, QpackSettings, Res};
+use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace, Header};
+use neqo_transport::{Connection, Error as TransportError, StreamId};
+use std::collections::{HashMap, HashSet, VecDeque};
+use std::convert::TryFrom;
+
+pub const QPACK_UNI_STREAM_TYPE_ENCODER: u64 = 0x2;
+
+#[derive(Debug, PartialEq)]
+enum LocalStreamState {
+ NoStream,
+ Uninitialized(StreamId),
+ Initialized(StreamId),
+}
+
+impl LocalStreamState {
+ pub fn stream_id(&self) -> Option<StreamId> {
+ match self {
+ Self::NoStream => None,
+ Self::Uninitialized(stream_id) | Self::Initialized(stream_id) => Some(*stream_id),
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct QPackEncoder {
+ table: HeaderTable,
+ max_table_size: u64,
+ max_entries: u64,
+ instruction_reader: DecoderInstructionReader,
+ local_stream: LocalStreamState,
+ max_blocked_streams: u16,
+ // Remember header blocks that are referring to dynamic table.
+ // There can be multiple header blocks in one stream, headers, trailer, push stream request, etc.
+ // This HashMap maps a stream ID to a list of header blocks. Each header block is a list of
+ // referenced dynamic table entries.
+ unacked_header_blocks: HashMap<StreamId, VecDeque<HashSet<u64>>>,
+ blocked_stream_cnt: u16,
+ use_huffman: bool,
+ next_capacity: Option<u64>,
+ stats: Stats,
+}
+
+impl QPackEncoder {
+ #[must_use]
+ pub fn new(qpack_settings: &QpackSettings, use_huffman: bool) -> Self {
+ Self {
+ table: HeaderTable::new(true),
+ max_table_size: qpack_settings.max_table_size_encoder,
+ max_entries: 0,
+ instruction_reader: DecoderInstructionReader::new(),
+ local_stream: LocalStreamState::NoStream,
+ max_blocked_streams: 0,
+ unacked_header_blocks: HashMap::new(),
+ blocked_stream_cnt: 0,
+ use_huffman,
+ next_capacity: None,
+ stats: Stats::default(),
+ }
+ }
+
+ /// This function is use for setting encoders table max capacity. The value is received as
+ /// a `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting parameter.
+ /// # Errors
+ /// `EncoderStream` if value is too big.
+ /// `ChangeCapacity` if table capacity cannot be reduced.
+ pub fn set_max_capacity(&mut self, cap: u64) -> Res<()> {
+ if cap > (1 << 30) - 1 {
+ return Err(Error::EncoderStream);
+ }
+
+ if cap == self.table.capacity() {
+ return Ok(());
+ }
+
+ qdebug!(
+ [self],
+ "Set max capacity to new capacity:{} old:{} max_table_size={}.",
+ cap,
+ self.table.capacity(),
+ self.max_table_size,
+ );
+
+ let new_cap = std::cmp::min(self.max_table_size, cap);
+ // we also set our table to the max allowed.
+ self.change_capacity(new_cap);
+ Ok(())
+ }
+
+ /// This function is use for setting encoders max blocked streams. The value is received as
+ /// a `SETTINGS_QPACK_BLOCKED_STREAMS` setting parameter.
+ /// # Errors
+ /// `EncoderStream` if value is too big.
+ pub fn set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()> {
+ self.max_blocked_streams = u16::try_from(blocked_streams).or(Err(Error::EncoderStream))?;
+ Ok(())
+ }
+
+ /// Reads decoder instructions.
+ /// # Errors
+ /// May return: `ClosedCriticalStream` if stream has been closed or `DecoderStream`
+ /// in case of any other transport error.
+ pub fn receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
+ self.read_instructions(conn, stream_id)
+ .map_err(|e| map_error(&e))
+ }
+
+ fn read_instructions(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
+ qdebug!([self], "read a new instraction");
+ loop {
+ let mut recv = ReceiverConnWrapper::new(conn, stream_id);
+ match self.instruction_reader.read_instructions(&mut recv) {
+ Ok(instruction) => self.call_instruction(instruction, conn.qlog_mut())?,
+ Err(Error::NeedMoreData) => break Ok(()),
+ Err(e) => break Err(e),
+ }
+ }
+ }
+
+ fn recalculate_blocked_streams(&mut self) {
+ let acked_inserts_cnt = self.table.get_acked_inserts_cnt();
+ self.blocked_stream_cnt = 0;
+ for hb_list in self.unacked_header_blocks.values_mut() {
+ debug_assert!(!hb_list.is_empty());
+ if hb_list.iter().flatten().any(|e| *e >= acked_inserts_cnt) {
+ self.blocked_stream_cnt += 1;
+ }
+ }
+ }
+
+ #[allow(clippy::map_err_ignore)]
+ fn insert_count_instruction(&mut self, increment: u64) -> Res<()> {
+ self.table
+ .increment_acked(increment)
+ .map_err(|_| Error::DecoderStream)?;
+ self.recalculate_blocked_streams();
+ Ok(())
+ }
+
+ fn header_ack(&mut self, stream_id: StreamId) {
+ self.stats.header_acks_recv += 1;
+ let mut new_acked = self.table.get_acked_inserts_cnt();
+ if let Some(hb_list) = self.unacked_header_blocks.get_mut(&stream_id) {
+ if let Some(ref_list) = hb_list.pop_back() {
+ for iter in ref_list {
+ self.table.remove_ref(iter);
+ if iter >= new_acked {
+ new_acked = iter + 1;
+ }
+ }
+ } else {
+ debug_assert!(false, "We should have at least one header block.");
+ }
+ if hb_list.is_empty() {
+ self.unacked_header_blocks.remove(&stream_id);
+ }
+ }
+ if new_acked > self.table.get_acked_inserts_cnt() {
+ self.insert_count_instruction(new_acked - self.table.get_acked_inserts_cnt())
+ .expect("This should neve happen");
+ }
+ }
+
+ fn stream_cancellation(&mut self, stream_id: StreamId) {
+ self.stats.stream_cancelled_recv += 1;
+ let mut was_blocker = false;
+ if let Some(mut hb_list) = self.unacked_header_blocks.remove(&stream_id) {
+ debug_assert!(!hb_list.is_empty());
+ while let Some(ref_list) = hb_list.pop_front() {
+ for iter in ref_list {
+ self.table.remove_ref(iter);
+ was_blocker = was_blocker || (iter >= self.table.get_acked_inserts_cnt());
+ }
+ }
+ }
+ if was_blocker {
+ debug_assert!(self.blocked_stream_cnt > 0);
+ self.blocked_stream_cnt -= 1;
+ }
+ }
+
+ fn call_instruction(
+ &mut self,
+ instruction: DecoderInstruction,
+ qlog: &mut NeqoQlog,
+ ) -> Res<()> {
+ qdebug!([self], "call intruction {:?}", instruction);
+ match instruction {
+ DecoderInstruction::InsertCountIncrement { increment } => {
+ qlog::qpack_read_insert_count_increment_instruction(
+ qlog,
+ increment,
+ &increment.to_be_bytes(),
+ );
+
+ self.insert_count_instruction(increment)
+ }
+ DecoderInstruction::HeaderAck { stream_id } => {
+ self.header_ack(stream_id);
+ Ok(())
+ }
+ DecoderInstruction::StreamCancellation { stream_id } => {
+ self.stream_cancellation(stream_id);
+ Ok(())
+ }
+ DecoderInstruction::NoInstruction => Ok(()),
+ }
+ }
+
+ /// Inserts a new entry into a table and sends the corresponding instruction to a peer. An entry is added only
+ /// if it is possible to send the corresponding instruction immediately, i.e. the encoder stream is not
+ /// blocked by the flow control (or stream internal buffer(this is very unlikely)).
+ /// ### Errors
+ /// `EncoderStreamBlocked` if the encoder stream is blocked by the flow control.
+ /// `DynamicTableFull` if the dynamic table does not have enough space for the entry.
+ /// The function can return transport errors: `InvalidStreamId`, `InvalidInput` and `FinalSizeError`.
+ /// # Panics
+ /// When the insertion fails (it should not).
+ pub fn send_and_insert(
+ &mut self,
+ conn: &mut Connection,
+ name: &[u8],
+ value: &[u8],
+ ) -> Res<u64> {
+ qdebug!([self], "insert {:?} {:?}.", name, value);
+
+ let entry_size = name.len() + value.len() + ADDITIONAL_TABLE_ENTRY_SIZE;
+
+ if !self.table.insert_possible(entry_size) {
+ return Err(Error::DynamicTableFull);
+ }
+
+ let mut buf = QpackData::default();
+ EncoderInstruction::InsertWithNameLiteral { name, value }
+ .marshal(&mut buf, self.use_huffman);
+
+ let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;
+
+ let sent = conn
+ .stream_send_atomic(stream_id, &buf)
+ .map_err(|e| map_stream_send_atomic_error(&e))?;
+ if !sent {
+ return Err(Error::EncoderStreamBlocked);
+ }
+
+ self.stats.dynamic_table_inserts += 1;
+
+ match self.table.insert(name, value) {
+ Ok(inx) => Ok(inx),
+ Err(e) => {
+ debug_assert!(false);
+ Err(e)
+ }
+ }
+ }
+
+ fn change_capacity(&mut self, value: u64) {
+ qdebug!([self], "change capacity: {}", value);
+ self.next_capacity = Some(value);
+ }
+
+ fn maybe_send_change_capacity(
+ &mut self,
+ conn: &mut Connection,
+ stream_id: StreamId,
+ ) -> Res<()> {
+ if let Some(cap) = self.next_capacity {
+ // Check if it is possible to reduce the capacity, e.g. if enough space can be make free for the reduction.
+ if cap < self.table.capacity() && !self.table.can_evict_to(cap) {
+ return Err(Error::DynamicTableFull);
+ }
+ let mut buf = QpackData::default();
+ EncoderInstruction::Capacity { value: cap }.marshal(&mut buf, self.use_huffman);
+ if !conn.stream_send_atomic(stream_id, &buf)? {
+ return Err(Error::EncoderStreamBlocked);
+ }
+ if self.table.set_capacity(cap).is_err() {
+ debug_assert!(
+ false,
+ "can_evict_to should have checked and make sure this operation is possible"
+ );
+ return Err(Error::InternalError(1));
+ }
+ self.max_entries = cap / 32;
+ self.next_capacity = None;
+ }
+ Ok(())
+ }
+
+ /// Sends any qpack encoder instructions.
+ /// # Errors
+ /// returns `EncoderStream` in case of an error.
+ pub fn send_encoder_updates(&mut self, conn: &mut Connection) -> Res<()> {
+ match self.local_stream {
+ LocalStreamState::NoStream => {
+ qerror!("Send call but there is no stream yet.");
+ Ok(())
+ }
+ LocalStreamState::Uninitialized(stream_id) => {
+ let mut buf = QpackData::default();
+ buf.encode_varint(QPACK_UNI_STREAM_TYPE_ENCODER);
+ if !conn.stream_send_atomic(stream_id, &buf[..])? {
+ return Err(Error::EncoderStreamBlocked);
+ }
+ self.local_stream = LocalStreamState::Initialized(stream_id);
+ self.maybe_send_change_capacity(conn, stream_id)
+ }
+ LocalStreamState::Initialized(stream_id) => {
+ self.maybe_send_change_capacity(conn, stream_id)
+ }
+ }
+ }
+
+ fn is_stream_blocker(&self, stream_id: StreamId) -> bool {
+ if let Some(hb_list) = self.unacked_header_blocks.get(&stream_id) {
+ debug_assert!(!hb_list.is_empty());
+ match hb_list.iter().flatten().max() {
+ Some(max_ref) => *max_ref >= self.table.get_acked_inserts_cnt(),
+ None => false,
+ }
+ } else {
+ false
+ }
+ }
+
+ /// Encodes headers
+ /// # Errors
+ /// `ClosedCriticalStream` if the encoder stream is closed.
+ /// `InternalError` if an unexpected error occurred.
+ /// # Panics
+ /// If there is a programming error.
+ pub fn encode_header_block(
+ &mut self,
+ conn: &mut Connection,
+ h: &[Header],
+ stream_id: StreamId,
+ ) -> HeaderEncoder {
+ qdebug!([self], "encoding headers.");
+
+ let mut encoder_blocked = false;
+ // Try to send capacity instructions if present.
+ if self.send_encoder_updates(conn).is_err() {
+ // This code doesn't try to deal with errors, it just tries
+ // to write to the encoder stream AND if it can't uses
+ // literal instructions.
+ // The errors can be:
+ // 1) `EncoderStreamBlocked` - this is an error that
+ // can occur.
+ // 2) `InternalError` - this is unexpected error.
+ // 3) `ClosedCriticalStream` - this is error that should
+ // close the HTTP/3 session.
+ // The last 2 errors are ignored here and will be picked up
+ // by the main loop.
+ encoder_blocked = true;
+ }
+
+ let mut encoded_h =
+ HeaderEncoder::new(self.table.base(), self.use_huffman, self.max_entries);
+
+ let stream_is_blocker = self.is_stream_blocker(stream_id);
+ let can_block = self.blocked_stream_cnt < self.max_blocked_streams || stream_is_blocker;
+
+ let mut ref_entries = HashSet::new();
+
+ for iter in h.iter() {
+ let name = iter.name().as_bytes().to_vec();
+ let value = iter.value().as_bytes().to_vec();
+ qtrace!("encoding {:x?} {:x?}.", name, value);
+
+ if let Some(LookupResult {
+ index,
+ static_table,
+ value_matches,
+ }) = self.table.lookup(&name, &value, can_block)
+ {
+ qtrace!(
+ [self],
+ "found a {} entry, value-match={}",
+ if static_table { "static" } else { "dynamic" },
+ value_matches
+ );
+ if value_matches {
+ if static_table {
+ encoded_h.encode_indexed_static(index);
+ } else {
+ encoded_h.encode_indexed_dynamic(index);
+ }
+ } else {
+ encoded_h.encode_literal_with_name_ref(static_table, index, &value);
+ }
+ if !static_table && ref_entries.insert(index) {
+ self.table.add_ref(index);
+ }
+ } else if can_block && !encoder_blocked {
+ // Insert using an InsertWithNameLiteral instruction. This entry name does not match any name in the
+ // tables therefore we cannot use any other instruction.
+ if let Ok(index) = self.send_and_insert(conn, &name, &value) {
+ encoded_h.encode_indexed_dynamic(index);
+ ref_entries.insert(index);
+ self.table.add_ref(index);
+ } else {
+ // This code doesn't try to deal with errors, it just tries
+ // to write to the encoder stream AND if it can't uses
+ // literal instructions.
+ // The errors can be:
+ // 1) `EncoderStreamBlocked` - this is an error that
+ // can occur.
+ // 2) `DynamicTableFull` - this is an error that
+ // can occur.
+ // 3) `InternalError` - this is unexpected error.
+ // 4) `ClosedCriticalStream` - this is error that should
+ // close the HTTP/3 session.
+ // The last 2 errors are ignored here and will be picked up
+ // by the main loop.
+ // As soon as one of the instructions cannot be written or the table is full, do not try again.
+ encoder_blocked = true;
+ encoded_h.encode_literal_with_name_literal(&name, &value);
+ }
+ } else {
+ encoded_h.encode_literal_with_name_literal(&name, &value);
+ }
+ }
+
+ encoded_h.encode_header_block_prefix();
+
+ if !stream_is_blocker {
+ // The streams was not a blocker, check if the stream is a blocker now.
+ if let Some(max_ref) = ref_entries.iter().max() {
+ if *max_ref >= self.table.get_acked_inserts_cnt() {
+ debug_assert!(self.blocked_stream_cnt <= self.max_blocked_streams);
+ self.blocked_stream_cnt += 1;
+ }
+ }
+ }
+
+ if !ref_entries.is_empty() {
+ self.unacked_header_blocks
+ .entry(stream_id)
+ .or_insert_with(VecDeque::new)
+ .push_front(ref_entries);
+ self.stats.dynamic_table_references += 1;
+ }
+ encoded_h
+ }
+
+ /// Encoder stream has been created. Add the stream id.
+ /// # Panics
+ /// If a stream has already been added.
+ pub fn add_send_stream(&mut self, stream_id: StreamId) {
+ if self.local_stream == LocalStreamState::NoStream {
+ self.local_stream = LocalStreamState::Uninitialized(stream_id);
+ } else {
+ panic!("Adding multiple local streams");
+ }
+ }
+
+ #[must_use]
+ pub fn stats(&self) -> Stats {
+ self.stats.clone()
+ }
+
+ #[must_use]
+ pub fn local_stream_id(&self) -> Option<StreamId> {
+ self.local_stream.stream_id()
+ }
+
+ #[cfg(test)]
+ fn blocked_stream_cnt(&self) -> u16 {
+ self.blocked_stream_cnt
+ }
+}
+
+impl ::std::fmt::Display for QPackEncoder {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "QPackEncoder")
+ }
+}
+
+fn map_error(err: &Error) -> Error {
+ if *err == Error::ClosedCriticalStream {
+ Error::ClosedCriticalStream
+ } else {
+ Error::DecoderStream
+ }
+}
+
+fn map_stream_send_atomic_error(err: &TransportError) -> Error {
+ match err {
+ TransportError::InvalidStreamId | TransportError::FinalSizeError => {
+ Error::ClosedCriticalStream
+ }
+ _ => {
+ debug_assert!(false, "Unexpected error");
+ Error::InternalError(2)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{Connection, Error, Header, QPackEncoder, Res};
+ use crate::QpackSettings;
+ use neqo_transport::{ConnectionParameters, StreamId, StreamType};
+ use std::mem;
+ use test_fixture::{default_client, default_server, handshake, new_server, now, DEFAULT_ALPN};
+
+ struct TestEncoder {
+ encoder: QPackEncoder,
+ send_stream_id: StreamId,
+ recv_stream_id: StreamId,
+ conn: Connection,
+ peer_conn: Connection,
+ }
+
+ impl TestEncoder {
+ pub fn change_capacity(&mut self, capacity: u64) -> Res<()> {
+ self.encoder.set_max_capacity(capacity).unwrap();
+ // We will try to really change the table only when we send the change capacity instruction.
+ self.encoder.send_encoder_updates(&mut self.conn)
+ }
+
+ pub fn insert(&mut self, header: &[u8], value: &[u8], inst: &[u8]) {
+ let res = self.encoder.send_and_insert(&mut self.conn, header, value);
+ assert!(res.is_ok());
+ self.send_instructions(inst);
+ }
+
+ pub fn encode_header_block(
+ &mut self,
+ stream_id: StreamId,
+ headers: &[Header],
+ expected_encoding: &[u8],
+ inst: &[u8],
+ ) {
+ let buf = self
+ .encoder
+ .encode_header_block(&mut self.conn, headers, stream_id);
+ assert_eq!(&buf[..], expected_encoding);
+ self.send_instructions(inst);
+ }
+
+ pub fn send_instructions(&mut self, encoder_instruction: &[u8]) {
+ self.encoder.send_encoder_updates(&mut self.conn).unwrap();
+ let out = self.conn.process(None, now());
+ let out2 = self.peer_conn.process(out.dgram(), now());
+ mem::drop(self.conn.process(out2.dgram(), now()));
+ let mut buf = [0_u8; 100];
+ let (amount, fin) = self
+ .peer_conn
+ .stream_recv(self.send_stream_id, &mut buf)
+ .unwrap();
+ assert!(!fin);
+ assert_eq!(buf[..amount], encoder_instruction[..]);
+ }
+ }
+
+ fn connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder {
+ let mut conn = default_client();
+ let mut peer_conn = max_data.map_or_else(default_server, |max| {
+ new_server(
+ DEFAULT_ALPN,
+ ConnectionParameters::default()
+ .max_stream_data(StreamType::UniDi, true, max)
+ .max_stream_data(StreamType::BiDi, true, max)
+ .max_stream_data(StreamType::BiDi, false, max),
+ )
+ });
+ handshake(&mut conn, &mut peer_conn);
+
+ // create a stream
+ let recv_stream_id = peer_conn.stream_create(StreamType::UniDi).unwrap();
+ let send_stream_id = conn.stream_create(StreamType::UniDi).unwrap();
+
+ // create an encoder
+ let mut encoder = QPackEncoder::new(
+ &QpackSettings {
+ max_table_size_encoder: 1500,
+ max_table_size_decoder: 0,
+ max_blocked_streams: 0,
+ },
+ huffman,
+ );
+ encoder.add_send_stream(send_stream_id);
+
+ TestEncoder {
+ encoder,
+ send_stream_id,
+ recv_stream_id,
+ conn,
+ peer_conn,
+ }
+ }
+
+ fn connect(huffman: bool) -> TestEncoder {
+ connect_generic(huffman, None)
+ }
+
+ fn connect_flow_control(max_data: u64) -> TestEncoder {
+ connect_generic(true, Some(max_data))
+ }
+
+ fn recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8]) {
+ encoder
+ .peer_conn
+ .stream_send(encoder.recv_stream_id, decoder_instruction)
+ .unwrap();
+ let out = encoder.peer_conn.process(None, now());
+ mem::drop(encoder.conn.process(out.dgram(), now()));
+ assert!(encoder
+ .encoder
+ .read_instructions(&mut encoder.conn, encoder.recv_stream_id)
+ .is_ok());
+ }
+
+ const CAP_INSTRUCTION_200: &[u8] = &[0x02, 0x3f, 0xa9, 0x01];
+ const CAP_INSTRUCTION_60: &[u8] = &[0x02, 0x3f, 0x1d];
+ const CAP_INSTRUCTION_1000: &[u8] = &[0x02, 0x3f, 0xc9, 0x07];
+ const CAP_INSTRUCTION_1500: &[u8] = &[0x02, 0x3f, 0xbd, 0x0b];
+
+ const HEADER_CONTENT_LENGTH: &[u8] = &[
+ 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
+ ];
+ const VALUE_1: &[u8] = &[0x31, 0x32, 0x33, 0x34];
+ const VALUE_2: &[u8] = &[0x31, 0x32, 0x33, 0x34, 0x35];
+
+ // HEADER_CONTENT_LENGTH and VALUE_1 encoded by instruction insert_with_name_literal.
+ const HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL: &[u8] = &[
+ 0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
+ 0x04, 0x31, 0x32, 0x33, 0x34,
+ ];
+
+ // HEADER_CONTENT_LENGTH and VALUE_2 encoded by instruction insert_with_name_literal.
+ const HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL: &[u8] = &[
+ 0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
+ 0x05, 0x31, 0x32, 0x33, 0x34, 0x35,
+ ];
+
+ // Indexed Header Field that refers to the first entry in the dynamic table.
+ const ENCODE_INDEXED_REF_DYNAMIC: &[u8] = &[0x02, 0x00, 0x80];
+
+ const STREAM_1: StreamId = StreamId::new(1);
+ const STREAM_2: StreamId = StreamId::new(2);
+ const HEADER_ACK_STREAM_ID_1: &[u8] = &[0x81];
+ const HEADER_ACK_STREAM_ID_2: &[u8] = &[0x82];
+ const STREAM_CANCELED_ID_1: &[u8] = &[0x41];
+
+ // test insert_with_name_literal which fails because there is not enough space in the table
+ #[test]
+ fn test_insert_with_name_literal_1() {
+ let mut encoder = connect(false);
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+ assert_eq!(Error::DynamicTableFull, res.unwrap_err());
+ encoder.send_instructions(&[0x02]);
+ }
+
+ // test insert_with_name_literal - succeeds
+ #[test]
+ fn test_insert_with_name_literal_2() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+ // test the change capacity instruction.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+ }
+
+ #[test]
+ fn test_change_capacity() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+ }
+
+ struct TestElement {
+ pub headers: Vec<Header>,
+ pub header_block: &'static [u8],
+ pub encoder_inst: &'static [u8],
+ }
+
+ #[test]
+ fn test_header_block_encoder_non() {
+ let test_cases: [TestElement; 6] = [
+ // test a header with ref to static - encode_indexed
+ TestElement {
+ headers: vec![Header::new(":method", "GET")],
+ header_block: &[0x00, 0x00, 0xd1],
+ encoder_inst: &[],
+ },
+ // test encode_literal_with_name_ref
+ TestElement {
+ headers: vec![Header::new(":path", "/somewhere")],
+ header_block: &[
+ 0x00, 0x00, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65, 0x72,
+ 0x65,
+ ],
+ encoder_inst: &[],
+ },
+ // test adding a new header and encode_post_base_index, also test fix_header_block_prefix
+ TestElement {
+ headers: vec![Header::new("my-header", "my-value")],
+ header_block: &[0x02, 0x80, 0x10],
+ encoder_inst: &[
+ 0x49, 0x6d, 0x79, 0x2d, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x08, 0x6d, 0x79,
+ 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65,
+ ],
+ },
+ // test encode_indexed with a ref to dynamic table.
+ TestElement {
+ headers: vec![Header::new("my-header", "my-value")],
+ header_block: ENCODE_INDEXED_REF_DYNAMIC,
+ encoder_inst: &[],
+ },
+ // test encode_literal_with_name_ref.
+ TestElement {
+ headers: vec![Header::new("my-header", "my-value2")],
+ header_block: &[
+ 0x02, 0x00, 0x40, 0x09, 0x6d, 0x79, 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32,
+ ],
+ encoder_inst: &[],
+ },
+ // test multiple headers
+ TestElement {
+ headers: vec![
+ Header::new(":method", "GET"),
+ Header::new(":path", "/somewhere"),
+ Header::new(":authority", "example.com"),
+ Header::new(":scheme", "https"),
+ ],
+ header_block: &[
+ 0x00, 0x01, 0xd1, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65,
+ 0x72, 0x65, 0x50, 0x0b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63,
+ 0x6f, 0x6d, 0xd7,
+ ],
+ encoder_inst: &[],
+ },
+ ];
+
+ let mut encoder = connect(false);
+
+ encoder.encoder.set_max_blocked_streams(100).unwrap();
+ encoder.encoder.set_max_capacity(200).unwrap();
+
+ // test the change capacity instruction.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ for t in &test_cases {
+ let buf = encoder
+ .encoder
+ .encode_header_block(&mut encoder.conn, &t.headers, STREAM_1);
+ assert_eq!(&buf[..], t.header_block);
+ encoder.send_instructions(t.encoder_inst);
+ }
+ }
+
+ #[test]
+ fn test_header_block_encoder_huffman() {
+ let test_cases: [TestElement; 6] = [
+ // test a header with ref to static - encode_indexed
+ TestElement {
+ headers: vec![Header::new(":method", "GET")],
+ header_block: &[0x00, 0x00, 0xd1],
+ encoder_inst: &[],
+ },
+ // test encode_literal_with_name_ref
+ TestElement {
+ headers: vec![Header::new(":path", "/somewhere")],
+ header_block: &[
+ 0x00, 0x00, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85,
+ ],
+ encoder_inst: &[],
+ },
+ // test adding a new header and encode_post_base_index, also test fix_header_block_prefix
+ TestElement {
+ headers: vec![Header::new("my-header", "my-value")],
+ header_block: &[0x02, 0x80, 0x10],
+ encoder_inst: &[
+ 0x67, 0xa7, 0xd2, 0xd3, 0x94, 0x72, 0x16, 0xcf, 0x86, 0xa7, 0xd2, 0xdd, 0xc7,
+ 0x45, 0xa5,
+ ],
+ },
+ // test encode_indexed with a ref to dynamic table.
+ TestElement {
+ headers: vec![Header::new("my-header", "my-value")],
+ header_block: ENCODE_INDEXED_REF_DYNAMIC,
+ encoder_inst: &[],
+ },
+ // test encode_literal_with_name_ref.
+ TestElement {
+ headers: vec![Header::new("my-header", "my-value2")],
+ header_block: &[
+ 0x02, 0x00, 0x40, 0x87, 0xa7, 0xd2, 0xdd, 0xc7, 0x45, 0xa5, 0x17,
+ ],
+ encoder_inst: &[],
+ },
+ // test multiple headers
+ TestElement {
+ headers: vec![
+ Header::new(":method", "GET"),
+ Header::new(":path", "/somewhere"),
+ Header::new(":authority", "example.com"),
+ Header::new(":scheme", "https"),
+ ],
+ header_block: &[
+ 0x00, 0x01, 0xd1, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85, 0x50,
+ 0x88, 0x2f, 0x91, 0xd3, 0x5d, 0x05, 0x5c, 0x87, 0xa7, 0xd7,
+ ],
+ encoder_inst: &[],
+ },
+ ];
+
+ let mut encoder = connect(true);
+
+ encoder.encoder.set_max_blocked_streams(100).unwrap();
+ encoder.encoder.set_max_capacity(200).unwrap();
+
+ // test the change capacity instruction.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ for t in &test_cases {
+ let buf = encoder
+ .encoder
+ .encode_header_block(&mut encoder.conn, &t.headers, STREAM_1);
+ assert_eq!(&buf[..], t.header_block);
+ encoder.send_instructions(t.encoder_inst);
+ }
+ }
+
+ // Test inserts block on waiting for an insert count increment.
+ #[test]
+ fn test_insertion_blocked_on_insert_count_feedback() {
+ let mut encoder = connect(false);
+
+ encoder.encoder.set_max_capacity(60).unwrap();
+
+ // test the change capacity instruction.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // insert "content-length: 12345 which will fail because the ntry in the table cannot be evicted.
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
+ assert!(res.is_err());
+ encoder.send_instructions(&[]);
+
+ // receive an insert count increment.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ // insert "content-length: 12345 again it will succeed.
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
+ }
+
+ // Test inserts block on waiting for acks
+ // test the table insertion is blocked:
+ // 0 - waiting for a header ack
+ // 2 - waiting for a stream cancel.
+ fn test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8) {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(60).is_ok());
+ // test the change capacity instruction.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // receive an insert count increment.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ // send a header block
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_eq!(&buf[..], ENCODE_INDEXED_REF_DYNAMIC);
+ encoder.send_instructions(&[]);
+
+ // insert "content-length: 12345 which will fail because the entry in the table cannot be evicted
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
+ assert!(res.is_err());
+ encoder.send_instructions(&[]);
+
+ if wait == 0 {
+ // receive a header_ack.
+ recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
+ } else {
+ // receive a stream canceled
+ recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
+ }
+
+ // insert "content-length: 12345 again it will succeed.
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
+ }
+
+ #[test]
+ fn test_header_ack() {
+ test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(0);
+ }
+
+ #[test]
+ fn test_stream_canceled() {
+ test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(1);
+ }
+
+ fn assert_is_index_to_dynamic(buf: &[u8]) {
+ assert_eq!(buf[2] & 0xc0, 0x80);
+ }
+
+ fn assert_is_index_to_dynamic_post(buf: &[u8]) {
+ assert_eq!(buf[2] & 0xf0, 0x10);
+ }
+
+ fn assert_is_index_to_static_name_only(buf: &[u8]) {
+ assert_eq!(buf[2] & 0xf0, 0x50);
+ }
+
+ fn assert_is_literal_value_literal_name(buf: &[u8]) {
+ assert_eq!(buf[2] & 0xf0, 0x20);
+ }
+
+ #[test]
+ fn max_block_streams1() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(60).is_ok());
+
+ // change capacity to 60.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ encoder.encoder.set_max_blocked_streams(1).unwrap();
+
+ // send a header block, it refers to unacked entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ encoder.send_instructions(&[]);
+
+ // The next one will not use the dynamic entry because it is exceeding the max_blocked_streams
+ // limit.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ StreamId::new(2),
+ );
+ assert_is_index_to_static_name_only(&buf);
+
+ encoder.send_instructions(&[]);
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // another header block to already blocked stream can still use the entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+ }
+
+ #[test]
+ fn max_block_streams2() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // insert "content-length: 12345
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
+
+ encoder.encoder.set_max_blocked_streams(1).unwrap();
+
+ // send a header block, it refers to unacked entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ // encode another header block for the same stream that will refer to the second entry
+ // in the dynamic table.
+ // This should work because the stream is already a blocked stream
+ // send a header block, it refers to unacked entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "12345")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+ }
+
+ #[test]
+ fn max_block_streams3() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ encoder.encoder.set_max_blocked_streams(1).unwrap();
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+
+ // send a header block, that creates an new entry and refers to it.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // The next one will not create a new entry because the encoder is on max_blocked_streams limit.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name2", "value2")],
+ STREAM_2,
+ );
+ assert_is_literal_value_literal_name(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // another header block to already blocked stream can still create a new entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name2", "value2")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+ }
+
+ #[test]
+ fn max_block_streams4() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ encoder.encoder.set_max_blocked_streams(1).unwrap();
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+
+ // send a header block, that creates an new entry and refers to it.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // another header block to already blocked stream can still create a new entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name2", "value2")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // receive a header_ack for the first header block.
+ recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
+
+ // The stream is still blocking because the second header block is not acked.
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+ }
+
+ #[test]
+ fn max_block_streams5() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ encoder.encoder.set_max_blocked_streams(1).unwrap();
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+
+ // send a header block, that creates an new entry and refers to it.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // another header block to already blocked stream can still create a new entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // receive a header_ack for the first header block.
+ recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
+
+ // The stream is not blocking anymore because header ack also acks the instruction.
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+ }
+
+ #[test]
+ fn max_block_streams6() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+
+ // send a header block, that creates an new entry and refers to it.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // header block for the next stream will create an new entry as well.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name2", "value2")],
+ STREAM_2,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
+
+ // receive a header_ack for the second header block. This will ack the first as well
+ recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_2);
+
+ // The stream is not blocking anymore because header ack also acks the instruction.
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+ }
+
+ #[test]
+ fn max_block_streams7() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+
+ // send a header block, that creates an new entry and refers to it.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // header block for the next stream will create an new entry as well.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_2,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
+
+ // receive a stream cancel for the first stream.
+ // This will remove the first stream as blocking but it will not mark the instruction as acked.
+ // and the second steam will still be blocking.
+ recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
+
+ // The stream is not blocking anymore because header ack also acks the instruction.
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+ }
+
+ #[test]
+ fn max_block_stream8() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(200).is_ok());
+
+ // change capacity to 200.
+ encoder.send_instructions(CAP_INSTRUCTION_200);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
+
+ // send a header block, that creates an new entry and refers to it.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+
+ // header block for the next stream will refer to the same entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name1", "value1")],
+ STREAM_2,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
+
+ // send another header block on stream 1.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("name2", "value2")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic_post(&buf);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);
+
+ // stream 1 is block on entries 1 and 2; stream 2 is block only on 1.
+ // receive an Insert Count Increment for the first entry.
+ // After that only stream 1 will be blocking.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
+ }
+
+ #[test]
+ fn dynamic_table_can_evict1() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(60).is_ok());
+
+ // change capacity to 60.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // send a header block, it refers to unacked entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ // trying to evict the entry will failed.
+ assert!(encoder.change_capacity(10).is_err());
+
+ // receive an Insert Count Increment for the entry.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ // trying to evict the entry will failed. The stream is still referring to it.
+ assert!(encoder.change_capacity(10).is_err());
+
+ // receive a header_ack for the header block.
+ recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
+
+ // now entry can be evicted.
+ assert!(encoder.change_capacity(10).is_ok());
+ }
+
+ #[test]
+ fn dynamic_table_can_evict2() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(60).is_ok());
+
+ // change capacity to 60.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // send a header block, it refers to unacked entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ // trying to evict the entry will failed.
+ assert!(encoder.change_capacity(10).is_err());
+
+ // receive an Insert Count Increment for the entry.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ // trying to evict the entry will failed. The stream is still referring to it.
+ assert!(encoder.change_capacity(10).is_err());
+
+ // receive a stream cancelled.
+ recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
+
+ // now entry can be evicted.
+ assert!(encoder.change_capacity(10).is_ok());
+ }
+
+ #[test]
+ fn dynamic_table_can_evict3() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(60).is_ok());
+
+ // change capacity to 60.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // trying to evict the entry will failed, because the entry is not acked.
+ assert!(encoder.change_capacity(10).is_err());
+
+ // receive an Insert Count Increment for the entry.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ // now entry can be evicted.
+ assert!(encoder.change_capacity(10).is_ok());
+ }
+
+ #[test]
+ fn dynamic_table_can_evict4() {
+ let mut encoder = connect(false);
+
+ assert!(encoder.encoder.set_max_capacity(60).is_ok());
+
+ // change capacity to 60.
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ encoder.encoder.set_max_blocked_streams(2).unwrap();
+
+ // insert "content-length: 1234
+ let res =
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
+
+ assert!(res.is_ok());
+ encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
+
+ // send a header block, it refers to unacked entry.
+ let buf = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[Header::new("content-length", "1234")],
+ STREAM_1,
+ );
+ assert_is_index_to_dynamic(&buf);
+
+ // trying to evict the entry will failed. The stream is still referring to it and
+ // entry is not acked.
+ assert!(encoder.change_capacity(10).is_err());
+
+ // receive a header_ack for the header block. This will also ack the instruction.
+ recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
+
+ // now entry can be evicted.
+ assert!(encoder.change_capacity(10).is_ok());
+ }
+
+ #[test]
+ fn encoder_flow_controlled_blocked() {
+ const SMALL_MAX_DATA: u64 = 20;
+ const ONE_INSTRUCTION_1: &[u8] = &[
+ 0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x7f, 0x83, 0x8, 0x99, 0x6b,
+ ];
+ const ONE_INSTRUCTION_2: &[u8] = &[
+ 0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x37, 0x83, 0x8, 0x99, 0x6b,
+ ];
+
+ let mut encoder = connect_flow_control(SMALL_MAX_DATA);
+
+ // change capacity to 1000 and max_block streams to 20.
+ encoder.encoder.set_max_blocked_streams(20).unwrap();
+ assert!(encoder.encoder.set_max_capacity(1000).is_ok());
+ encoder.send_instructions(CAP_INSTRUCTION_1000);
+
+ // Encode a header block with 2 headers. The first header will be added to the dynamic table.
+ // The second will not be added to the dynamic table, because the corresponding instruction
+ // cannot be written immediately due to the flow control limit.
+ let buf1 = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[
+ Header::new("something", "1234"),
+ Header::new("something2", "12345678910"),
+ ],
+ STREAM_1,
+ );
+
+ // Assert that the first header is encoded as an index to the dynamic table (a post form).
+ assert_eq!(buf1[2], 0x10);
+ // Assert that the second header is encoded as a literal with a name literal
+ assert_eq!(buf1[3] & 0xf0, 0x20);
+
+ // Try to encode another header block. Here both headers will be encoded as a literal with a name literal
+ let buf2 = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[
+ Header::new("something3", "1234"),
+ Header::new("something4", "12345678910"),
+ ],
+ STREAM_2,
+ );
+ assert_eq!(buf2[2] & 0xf0, 0x20);
+
+ // Ensure that we have sent only one instruction for (String::from("something", "1234"))
+ encoder.send_instructions(ONE_INSTRUCTION_1);
+
+ // exchange a flow control update.
+ let out = encoder.peer_conn.process(None, now());
+ mem::drop(encoder.conn.process(out.dgram(), now()));
+
+ // Try writing a new header block. Now, headers will be added to the dynamic table again, because
+ // instructions can be sent.
+ let buf3 = encoder.encoder.encode_header_block(
+ &mut encoder.conn,
+ &[
+ Header::new("something5", "1234"),
+ Header::new("something6", "12345678910"),
+ ],
+ StreamId::new(3),
+ );
+ // Assert that the first header is encoded as an index to the dynamic table (a post form).
+ assert_eq!(buf3[2], 0x10);
+ // Assert that the second header is encoded as a literal with a name literal
+ assert_eq!(buf3[3] & 0xf0, 0x20);
+
+ // Asset that one instruction has been sent
+ encoder.send_instructions(ONE_INSTRUCTION_2);
+ }
+
+ #[test]
+ fn encoder_max_capacity_limit() {
+ let mut encoder = connect(false);
+
+ // change capacity to 2000.
+ assert!(encoder.encoder.set_max_capacity(2000).is_ok());
+ encoder.send_instructions(CAP_INSTRUCTION_1500);
+ }
+
+ #[test]
+ fn test_do_not_evict_entry_that_are_referred_only_by_the_same_header_blocked_encoding() {
+ let mut encoder = connect(false);
+
+ encoder.encoder.set_max_blocked_streams(20).unwrap();
+ assert!(encoder.change_capacity(50).is_ok());
+
+ encoder
+ .encoder
+ .send_and_insert(&mut encoder.conn, b"something5", b"1234")
+ .unwrap();
+
+ encoder
+ .encoder
+ .send_encoder_updates(&mut encoder.conn)
+ .unwrap();
+ let out = encoder.conn.process(None, now());
+ mem::drop(encoder.peer_conn.process(out.dgram(), now()));
+ // receive an insert count increment.
+ recv_instruction(&mut encoder, &[0x01]);
+
+ // The first header will use the table entry and the second will use the literal
+ // encoding because the first entry is referred to and cannot be evicted.
+ assert_eq!(
+ encoder
+ .encoder
+ .encode_header_block(
+ &mut encoder.conn,
+ &[
+ Header::new("something5", "1234"),
+ Header::new("something6", "1234"),
+ ],
+ StreamId::new(3),
+ )
+ .to_vec(),
+ &[
+ 0x02, 0x00, 0x80, 0x27, 0x03, 0x73, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x69, 0x6e, 0x67,
+ 0x36, 0x04, 0x31, 0x32, 0x33, 0x34
+ ]
+ );
+ // Also check that ther is no new instruction send by the encoder.
+ assert!(encoder.conn.process_output(now()).dgram().is_none());
+ }
+
+ #[test]
+ fn test_streams_cancel_cleans_up_unacked_header_blocks() {
+ let mut encoder = connect(false);
+
+ encoder.encoder.set_max_blocked_streams(10).unwrap();
+ assert!(encoder.change_capacity(60).is_ok());
+ encoder.send_instructions(CAP_INSTRUCTION_60);
+
+ // insert "content-length: 1234
+ encoder.insert(
+ HEADER_CONTENT_LENGTH,
+ VALUE_1,
+ HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL,
+ );
+
+ // send a header block
+ encoder.encode_header_block(
+ StreamId::new(1),
+ &[Header::new("content-length", "1234")],
+ ENCODE_INDEXED_REF_DYNAMIC,
+ &[],
+ );
+
+ // receive a stream canceled instruction.
+ recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
+
+ recv_instruction(&mut encoder, &[0x01]);
+ }
+}