summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-http3/src/connection.rs
blob: 6da7d2a9599495aad4a104c733356c4c556ad371 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![allow(clippy::module_name_repetitions)]

use crate::control_stream_local::{ControlStreamLocal, HTTP3_UNI_STREAM_TYPE_CONTROL};
use crate::control_stream_remote::ControlStreamRemote;
use crate::hframe::HFrame;
use crate::send_message::SendMessage;
use crate::settings::{HSetting, HSettingType, HSettings, HttpZeroRttChecker};
use crate::stream_type_reader::NewStreamTypeReader;
use crate::{RecvStream, ResetType};
use neqo_common::{qdebug, qerror, qinfo, qtrace, qwarn};
use neqo_qpack::decoder::{QPackDecoder, QPACK_UNI_STREAM_TYPE_DECODER};
use neqo_qpack::encoder::{QPackEncoder, QPACK_UNI_STREAM_TYPE_ENCODER};
use neqo_qpack::QpackSettings;
use neqo_transport::{AppError, CloseError, Connection, State, StreamType};
use std::collections::{BTreeSet, HashMap};
use std::fmt::Debug;
use std::mem;

use crate::{Error, Res};

const HTTP3_UNI_STREAM_TYPE_PUSH: u64 = 0x1;
const QPACK_TABLE_SIZE_LIMIT: u64 = 1 << 30;

pub(crate) enum HandleReadableOutput {
    StreamNotFound,
    NoOutput,
    PushStream,
    ControlFrames(Vec<HFrame>),
}

#[derive(Debug)]
enum Http3RemoteSettingsState {
    NotReceived,
    Received(HSettings),
    ZeroRtt(HSettings),
}

#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Clone)]
pub enum Http3State {
    Initializing,
    ZeroRtt,
    Connected,
    GoingAway(u64),
    Closing(CloseError),
    Closed(CloseError),
}

impl Http3State {
    #[must_use]
    pub fn active(&self) -> bool {
        matches!(self, Http3State::Connected | Http3State::GoingAway(_) | Http3State::ZeroRtt)
    }
}

#[derive(Debug)]
pub(crate) struct Http3Connection {
    pub state: Http3State,
    local_qpack_settings: QpackSettings,
    control_stream_local: ControlStreamLocal,
    control_stream_remote: ControlStreamRemote,
    new_streams: HashMap<u64, NewStreamTypeReader>,
    pub qpack_encoder: QPackEncoder,
    pub qpack_decoder: QPackDecoder,
    settings_state: Http3RemoteSettingsState,
    streams_have_data_to_send: BTreeSet<u64>,
    pub send_streams: HashMap<u64, SendMessage>,
    pub recv_streams: HashMap<u64, Box<dyn RecvStream>>,
}

impl ::std::fmt::Display for Http3Connection {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Http3 connection")
    }
}

impl Http3Connection {
    /// Create a new connection.
    pub fn new(local_qpack_settings: QpackSettings) -> Self {
        if (local_qpack_settings.max_table_size_encoder >= QPACK_TABLE_SIZE_LIMIT)
            || (local_qpack_settings.max_table_size_decoder >= QPACK_TABLE_SIZE_LIMIT)
        {
            panic!("Wrong max_table_size");
        }
        Self {
            state: Http3State::Initializing,
            local_qpack_settings,
            control_stream_local: ControlStreamLocal::new(),
            control_stream_remote: ControlStreamRemote::new(),
            new_streams: HashMap::new(),
            qpack_encoder: QPackEncoder::new(local_qpack_settings, true),
            qpack_decoder: QPackDecoder::new(local_qpack_settings),
            settings_state: Http3RemoteSettingsState::NotReceived,
            streams_have_data_to_send: BTreeSet::new(),
            send_streams: HashMap::new(),
            recv_streams: HashMap::new(),
        }
    }

    fn initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()> {
        qinfo!([self], "Initialize the http3 connection.");
        self.control_stream_local.create(conn)?;

        self.send_settings();
        self.create_qpack_streams(conn)?;
        Ok(())
    }

    fn send_settings(&mut self) {
        qdebug!([self], "Send settings.");
        self.control_stream_local.queue_frame(&HFrame::Settings {
            settings: HSettings::new(&[
                HSetting {
                    setting_type: HSettingType::MaxTableCapacity,
                    value: self.qpack_decoder.get_max_table_size(),
                },
                HSetting {
                    setting_type: HSettingType::BlockedStreams,
                    value: self.qpack_decoder.get_blocked_streams().into(),
                },
            ]),
        });
        self.control_stream_local.queue_frame(&HFrame::Grease);
    }

    /// Save settings for adding to the session ticket.
    pub(crate) fn save_settings(&self) -> Vec<u8> {
        HttpZeroRttChecker::save(self.local_qpack_settings)
    }

    fn create_qpack_streams(&mut self, conn: &mut Connection) -> Res<()> {
        qdebug!([self], "create_qpack_streams.");
        self.qpack_encoder
            .add_send_stream(conn.stream_create(StreamType::UniDi)?);
        self.qpack_decoder
            .add_send_stream(conn.stream_create(StreamType::UniDi)?);
        Ok(())
    }

    /// Inform a `HttpConnection` that a stream has data to send and that `send` should be called for the stream.
    pub fn insert_streams_have_data_to_send(&mut self, stream_id: u64) {
        self.streams_have_data_to_send.insert(stream_id);
    }

    /// Return true if there is a stream that needs to send data.
    pub fn has_data_to_send(&self) -> bool {
        !self.streams_have_data_to_send.is_empty()
    }

    /// Call `send` for all streams that need to send data.
    pub fn process_sending(&mut self, conn: &mut Connection) -> Res<()> {
        // check if control stream has data to send.
        self.control_stream_local.send(conn)?;

        let to_send = mem::replace(&mut self.streams_have_data_to_send, BTreeSet::new());
        for stream_id in to_send {
            let mut remove = false;
            if let Some(s) = &mut self.send_streams.get_mut(&stream_id) {
                s.send(conn, &mut self.qpack_encoder)?;
                if s.has_data_to_send() {
                    self.streams_have_data_to_send.insert(stream_id);
                }
                remove = s.done();
            }
            if remove {
                self.send_streams.remove(&stream_id);
            }
        }
        self.qpack_decoder.send(conn)?;
        match self.qpack_encoder.send(conn) {
            Ok(())
            | Err(neqo_qpack::Error::EncoderStreamBlocked)
            | Err(neqo_qpack::Error::DynamicTableFull) => {}
            Err(e) => return Err(Error::QpackError(e)),
        }
        Ok(())
    }

    /// We have a resumption token which remembers previous settings. Update the setting.
    pub fn set_0rtt_settings(&mut self, conn: &mut Connection, settings: HSettings) -> Res<()> {
        self.initialize_http3_connection(conn)?;
        self.set_qpack_settings(&settings)?;
        self.settings_state = Http3RemoteSettingsState::ZeroRtt(settings);
        self.state = Http3State::ZeroRtt;
        Ok(())
    }

    /// Returns the settings for a connection. This is used for creating a resumption token.
    pub fn get_settings(&self) -> Option<HSettings> {
        if let Http3RemoteSettingsState::Received(settings) = &self.settings_state {
            Some(settings.clone())
        } else {
            None
        }
    }

    /// This function adds a new unidi stream and try to read its type. `Http3Connection` can handle
    /// a Http3 Control stream, Qpack streams and an unknown stream, but it cannot handle a Push stream.
    /// If a Push stream has been discovered, return true and let the `Http3Client`/`Server` handle it.
    pub fn handle_new_unidi_stream(&mut self, conn: &mut Connection, stream_id: u64) -> Res<bool> {
        qtrace!([self], "A new stream: {}.", stream_id);
        let stream_type;
        let fin;
        {
            let ns = self
                .new_streams
                .entry(stream_id)
                .or_insert_with(NewStreamTypeReader::new);
            stream_type = ns.get_type(conn, stream_id);
            fin = ns.fin();
        }

        if fin {
            self.new_streams.remove(&stream_id);
            Ok(false)
        } else {
            stream_type.map_or(Ok(false), |t| {
                self.new_streams.remove(&stream_id);
                self.decode_new_stream(conn, t, stream_id)
            })
        }
    }

    fn recv_decoder(&mut self, conn: &mut Connection, stream_id: u64) -> Res<bool> {
        if self.qpack_decoder.is_recv_stream(stream_id) {
            qdebug!(
                [self],
                "The qpack decoder stream ({}) is readable.",
                stream_id
            );
            let unblocked_streams = self.qpack_decoder.receive(conn, stream_id)?;
            for stream_id in unblocked_streams {
                qdebug!([self], "Stream {} is unblocked", stream_id);
                self.handle_read_stream(conn, stream_id, true)?;
            }
            Ok(true)
        } else {
            Ok(false)
        }
    }

    fn recv_control(&mut self, conn: &mut Connection, stream_id: u64) -> Res<HandleReadableOutput> {
        assert!(self.control_stream_remote.is_recv_stream(stream_id));
        let mut control_frames = Vec::new();

        loop {
            if let Some(f) = self.control_stream_remote.receive(conn)? {
                if let Some(f) = self.handle_control_frame(f)? {
                    control_frames.push(f);
                }
            } else if control_frames.is_empty() {
                return Ok(HandleReadableOutput::NoOutput);
            } else {
                return Ok(HandleReadableOutput::ControlFrames(control_frames));
            }
        }
    }

    /// This function handles reading from all streams, i.e. control, qpack, request/response
    /// stream and unidi stream that are still do not have a type.
    /// The function cannot handle:
    /// 1) a Push stream (if an unknown unidi stream is decoded to be a push stream)
    /// 2) frames `MaxPushId` or `Goaway` must be handled by `Http3Client`/`Server`.
    /// The function returns `HandleReadableOutput`.
    pub fn handle_stream_readable(
        &mut self,
        conn: &mut Connection,
        stream_id: u64,
    ) -> Res<HandleReadableOutput> {
        qtrace!([self], "Readable stream {}.", stream_id);

        let label = ::neqo_common::log_subject!(::log::Level::Debug, self);
        if self.handle_read_stream(conn, stream_id, false)? {
            qdebug!([label], "Request/response stream {} read.", stream_id);
            Ok(HandleReadableOutput::NoOutput)
        } else if self.control_stream_remote.is_recv_stream(stream_id) {
            qdebug!(
                [self],
                "The remote control stream ({}) is readable.",
                stream_id
            );
            self.recv_control(conn, stream_id)
        } else if self.qpack_encoder.recv_if_encoder_stream(conn, stream_id)? {
            qdebug!(
                [self],
                "The qpack encoder stream ({}) is readable.",
                stream_id
            );
            Ok(HandleReadableOutput::NoOutput)
        } else if self.recv_decoder(conn, stream_id)? {
            Ok(HandleReadableOutput::NoOutput)
        } else if let Some(ns) = self.new_streams.get_mut(&stream_id) {
            let stream_type = ns.get_type(conn, stream_id);
            let fin = ns.fin();
            if fin {
                self.new_streams.remove(&stream_id);
            }
            if let Some(t) = stream_type {
                self.new_streams.remove(&stream_id);
                self.decode_new_stream(conn, t, stream_id)?;
                // Make sure to read from this stream because DataReadable will not be set again.
                return match t {
                    HTTP3_UNI_STREAM_TYPE_CONTROL => self.recv_control(conn, stream_id),
                    QPACK_UNI_STREAM_TYPE_DECODER => {
                        self.qpack_encoder.recv_if_encoder_stream(conn, stream_id)?;
                        Ok(HandleReadableOutput::NoOutput)
                    }
                    QPACK_UNI_STREAM_TYPE_ENCODER => {
                        self.recv_decoder(conn, stream_id)?;
                        Ok(HandleReadableOutput::NoOutput)
                    }
                    HTTP3_UNI_STREAM_TYPE_PUSH => Ok(HandleReadableOutput::PushStream),
                    _ => Ok(HandleReadableOutput::NoOutput),
                };
            }

            Ok(HandleReadableOutput::NoOutput)
        } else {
            // For a new stream we receive NewStream event and a
            // RecvStreamReadable event.
            // In most cases we decode a new stream already on the NewStream
            // event and remove it from self.new_streams.
            // Therefore, while processing RecvStreamReadable there will be no
            // entry for the stream in self.new_streams.
            // This can be a push stream as well.
            Ok(HandleReadableOutput::StreamNotFound)
        }
    }

    fn is_critical_stream(&self, stream_id: u64) -> bool {
        self.qpack_encoder
            .local_stream_id()
            .iter()
            .chain(self.qpack_encoder.remote_stream_id().iter())
            .chain(self.qpack_decoder.local_stream_id().iter())
            .chain(self.qpack_decoder.remote_stream_id().iter())
            .chain(self.control_stream_local.stream_id().iter())
            .chain(self.control_stream_remote.stream_id().iter())
            .any(|id| stream_id == *id)
    }

    /// This is called when a RESET frame has been received.
    pub fn handle_stream_reset(&mut self, stream_id: u64, app_error: AppError) -> Res<()> {
        qinfo!(
            [self],
            "Handle a stream reset stream_id={} app_err={}",
            stream_id,
            app_error
        );

        if let Some(s) = self.recv_streams.remove(&stream_id) {
            s.stream_reset(app_error, &mut self.qpack_decoder, ResetType::Remote);
            Ok(())
        } else if self.is_critical_stream(stream_id) {
            Err(Error::HttpClosedCriticalStream)
        } else {
            Ok(())
        }
    }

    pub fn handle_stream_stop_sending(&mut self, stream_id: u64, app_error: AppError) -> Res<()> {
        qinfo!(
            [self],
            "Handle stream_stop_sending stream_id={} app_err={}",
            stream_id,
            app_error
        );

        if let Some(mut s) = self.send_streams.remove(&stream_id) {
            s.stop_sending(app_error);
            Ok(())
        } else if self.is_critical_stream(stream_id) {
            Err(Error::HttpClosedCriticalStream)
        } else {
            Ok(())
        }
    }

    /// This is called when `neqo_transport::Connection` state has been change to take proper actions in
    /// the HTTP3 layer.
    pub fn handle_state_change(&mut self, conn: &mut Connection, state: &State) -> Res<bool> {
        qdebug!([self], "Handle state change {:?}", state);
        match state {
            State::Connected => {
                debug_assert!(matches!(
                    self.state,
                    Http3State::Initializing | Http3State::ZeroRtt
                ));
                if self.state == Http3State::Initializing {
                    self.initialize_http3_connection(conn)?;
                }
                self.state = Http3State::Connected;
                Ok(true)
            }
            State::Closing { error, .. } | State::Draining { error, .. } => {
                if matches!(self.state, Http3State::Closing(_)| Http3State::Closed(_)) {
                    Ok(false)
                } else {
                    self.state = Http3State::Closing(error.clone().into());
                    Ok(true)
                }
            }
            State::Closed(error) => {
                if matches!(self.state, Http3State::Closed(_)) {
                    Ok(false)
                } else {
                    self.state = Http3State::Closed(error.clone().into());
                    Ok(true)
                }
            }
            _ => Ok(false),
        }
    }

    /// This is called when 0RTT has been reseted to clear `send_streams`, `recv_streams` and settings.
    pub fn handle_zero_rtt_rejected(&mut self) -> Res<()> {
        if self.state == Http3State::ZeroRtt {
            self.state = Http3State::Initializing;
            self.control_stream_local = ControlStreamLocal::new();
            self.control_stream_remote = ControlStreamRemote::new();
            self.new_streams.clear();
            self.qpack_encoder = QPackEncoder::new(self.local_qpack_settings, true);
            self.qpack_decoder = QPackDecoder::new(self.local_qpack_settings);
            self.settings_state = Http3RemoteSettingsState::NotReceived;
            self.streams_have_data_to_send.clear();
            // TODO: investigate whether this code can automatically retry failed transactions.
            self.send_streams.clear();
            self.recv_streams.clear();
            Ok(())
        } else {
            debug_assert!(false, "Zero rtt rejected in the wrong state.");
            Err(Error::HttpInternal)
        }
    }

    fn handle_read_stream(
        &mut self,
        conn: &mut Connection,
        stream_id: u64,
        header_unblocked: bool,
    ) -> Res<bool> {
        let label = ::neqo_common::log_subject!(::log::Level::Info, self);

        let r = self.recv_streams.get_mut(&stream_id);

        if r.is_none() {
            return Ok(false);
        }

        let recv_stream = r.unwrap();
        qinfo!(
            [label],
            "Request/response stream {} is readable.",
            stream_id
        );
        if header_unblocked {
            recv_stream.header_unblocked(conn, &mut self.qpack_decoder)?;
        } else {
            recv_stream.receive(conn, &mut self.qpack_decoder)?;
        }
        if recv_stream.done() {
            self.recv_streams.remove(&stream_id);
        }
        Ok(true)
    }

    /// Returns true if it is a push stream.
    fn decode_new_stream(
        &mut self,
        conn: &mut Connection,
        stream_type: u64,
        stream_id: u64,
    ) -> Res<bool> {
        match stream_type {
            HTTP3_UNI_STREAM_TYPE_CONTROL => {
                self.control_stream_remote.add_remote_stream(stream_id)?;
                Ok(false)
            }

            HTTP3_UNI_STREAM_TYPE_PUSH => {
                qinfo!([self], "A new push stream {}.", stream_id);
                Ok(true)
            }
            QPACK_UNI_STREAM_TYPE_ENCODER => {
                qinfo!([self], "A new remote qpack encoder stream {}", stream_id);
                Error::map_error(
                    self.qpack_decoder.add_recv_stream(stream_id),
                    Error::HttpStreamCreation,
                )?;
                Ok(false)
            }
            QPACK_UNI_STREAM_TYPE_DECODER => {
                qinfo!([self], "A new remote qpack decoder stream {}", stream_id);
                Error::map_error(
                    self.qpack_encoder.add_recv_stream(stream_id),
                    Error::HttpStreamCreation,
                )?;
                Ok(false)
            }
            _ => {
                conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?;
                Ok(false)
            }
        }
    }

    /// This is called when an application closes the connection.
    pub fn close(&mut self, error: AppError) {
        qinfo!([self], "Close connection error {:?}.", error);
        self.state = Http3State::Closing(CloseError::Application(error));
        if (!self.send_streams.is_empty() || !self.recv_streams.is_empty()) && (error == 0) {
            qwarn!("close(0) called when streams still active");
        }
        self.send_streams.clear();
        self.recv_streams.clear();
    }

    /// This is called when an application resets a stream.
    /// The application reset will close both sides.
    pub fn stream_reset(
        &mut self,
        conn: &mut Connection,
        stream_id: u64,
        error: AppError,
    ) -> Res<()> {
        qinfo!([self], "Reset stream {} error={}.", stream_id, error);

        let mut found = self.send_streams.remove(&stream_id).is_some();
        if let Some(s) = self.recv_streams.remove(&stream_id) {
            s.stream_reset(error, &mut self.qpack_decoder, ResetType::App);
            found = true;
        }

        // Stream maybe already be closed and we may get an error here, but we do not care.
        let _ = conn.stream_reset_send(stream_id, error);
        // Stream maybe already be closed and we may get an error here, but we do not care.
        let _ = conn.stream_stop_sending(stream_id, error);
        if found {
            Ok(())
        } else {
            Err(Error::InvalidStreamId)
        }
    }

    /// This is called when an application wants to close the sending side of a stream.
    pub fn stream_close_send(&mut self, conn: &mut Connection, stream_id: u64) -> Res<()> {
        qinfo!([self], "Close the sending side for stream {}.", stream_id);
        debug_assert!(self.state.active());
        let send_stream = self
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?;
        // The following funcion may return InvalidStreamId from the transport layer if the stream has been cloesd
        // already. It is ok to ignore it here.
        let _ = send_stream.close(conn);
        if send_stream.done() {
            self.send_streams.remove(&stream_id);
        }
        Ok(())
    }

    // If the control stream has received frames MaxPushId or Goaway which handling is specific to
    // the client and server, we must give them to the specific client/server handler.
    fn handle_control_frame(&mut self, f: HFrame) -> Res<Option<HFrame>> {
        qinfo!([self], "Handle a control frame {:?}", f);
        if !matches!(f, HFrame::Settings { .. })
            && !matches!(self.settings_state, Http3RemoteSettingsState::Received{..})
        {
            return Err(Error::HttpMissingSettings);
        }
        match f {
            HFrame::Settings { settings } => {
                self.handle_settings(settings)?;
                Ok(None)
            }
            HFrame::Goaway { .. } | HFrame::MaxPushId { .. } | HFrame::CancelPush { .. } => {
                Ok(Some(f))
            }
            _ => Err(Error::HttpFrameUnexpected),
        }
    }

    fn set_qpack_settings(&mut self, settings: &[HSetting]) -> Res<()> {
        for s in settings {
            qinfo!([self], " {:?} = {:?}", s.setting_type, s.value);
            match s.setting_type {
                HSettingType::MaxTableCapacity => self.qpack_encoder.set_max_capacity(s.value)?,
                HSettingType::BlockedStreams => {
                    self.qpack_encoder.set_max_blocked_streams(s.value)?
                }
                HSettingType::MaxHeaderListSize => (),
            }
        }
        Ok(())
    }

    fn handle_settings(&mut self, new_settings: HSettings) -> Res<()> {
        qinfo!([self], "Handle SETTINGS frame.");
        match &self.settings_state {
            Http3RemoteSettingsState::NotReceived => {
                self.set_qpack_settings(&new_settings)?;
                self.settings_state = Http3RemoteSettingsState::Received(new_settings);
                Ok(())
            }
            Http3RemoteSettingsState::ZeroRtt(settings) => {
                let mut qpack_changed = false;
                for st in &[
                    HSettingType::MaxHeaderListSize,
                    HSettingType::MaxTableCapacity,
                    HSettingType::BlockedStreams,
                ] {
                    let zero_rtt_value = settings.get(*st);
                    let new_value = new_settings.get(*st);
                    if zero_rtt_value == new_value {
                        continue;
                    }
                    if zero_rtt_value > new_value {
                        qerror!(
                            [self],
                            "The new({}) and the old value({}) of setting {:?} do not match",
                            new_value,
                            zero_rtt_value,
                            st
                        );
                        return Err(Error::HttpSettings);
                    }

                    match st {
                        HSettingType::MaxTableCapacity => {
                            if zero_rtt_value != 0 {
                                return Err(Error::QpackError(neqo_qpack::Error::DecoderStream));
                            }
                            qpack_changed = true;
                        }
                        HSettingType::BlockedStreams => qpack_changed = true,
                        _ => (),
                    }
                }
                if qpack_changed {
                    qdebug!([self], "Settings after zero rtt differ.");
                    self.set_qpack_settings(&(new_settings))?;
                }
                self.settings_state = Http3RemoteSettingsState::Received(new_settings);
                Ok(())
            }
            Http3RemoteSettingsState::Received { .. } => Err(Error::HttpFrameUnexpected),
        }
    }

    /// Return the current state on `Http3Connection`.
    pub fn state(&self) -> Http3State {
        self.state.clone()
    }

    /// Adds a new send and receive stream.
    pub fn add_streams(
        &mut self,
        stream_id: u64,
        send_stream: SendMessage,
        recv_stream: Box<dyn RecvStream>,
    ) {
        if send_stream.has_data_to_send() {
            self.streams_have_data_to_send.insert(stream_id);
        }
        self.send_streams.insert(stream_id, send_stream);
        self.recv_streams.insert(stream_id, recv_stream);
    }

    /// Add a new recv stream. This is used for push streams.
    pub fn add_recv_stream(&mut self, stream_id: u64, recv_stream: Box<dyn RecvStream>) {
        self.recv_streams.insert(stream_id, recv_stream);
    }

    pub fn queue_control_frame(&mut self, frame: &HFrame) {
        self.control_stream_local.queue_frame(frame);
    }
}