summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-http3/src/push_stream.rs
blob: 7bcce22a01e1306fe55d4f93ef49dc061a69ea06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::client_events::Http3ClientEvents;
use crate::push_controller::{PushController, RecvPushEvents};
use crate::recv_message::{MessageType, RecvMessage};
use crate::stream_type_reader::NewStreamTypeReader;
use crate::{Error, RecvStream, Res, ResetType};
use neqo_qpack::decoder::QPackDecoder;
use neqo_transport::{AppError, Connection};
use std::cell::RefCell;
use std::fmt::Display;
use std::rc::Rc;

#[derive(Debug)]
enum PushStreamState {
    ReadPushId(NewStreamTypeReader),
    ReadResponse { push_id: u64, response: RecvMessage },
    Closed,
}

impl PushStreamState {
    pub fn push_id(&self) -> Option<u64> {
        match self {
            Self::ReadResponse { push_id, .. } => Some(*push_id),
            _ => None,
        }
    }
}

// The `PushController` keeps information about all push streams. Each push stream is responsible for contacting the
// `PushController` to consult it about the push state and to inform it when the push stream is done (this are signal
// from the peer: stream has been closed or reset). `PushController` handles CANCEL_PUSH frames and canceling
// push from applications and PUSH_PROMISE frames.
//
// `PushStream` is responsible for reading from a push stream. It is used for reading push_id as well.
// It is created when a new push stream is received.
//
// After push_id has been read, The `PushController` is informed about the stream and its push_id. This is done by
// calling `add_new_push_stream`. `add_new_push_stream` may return an error (this will be a connection error)
// or a bool. true means that the streams should continue and false means that the stream should be reset(the stream
// will be canceled if the push has been canceled already (CANCEL_PUSH frame or canceling push from the application)
//
// `PushStreams` are kept in Http3Connection::recv_streams the same as a normal request/response stream.
// Http3Connection and read_data is responsible for reading the push data.
//
// PushHeaderReady and PushDataReadable are posted through the `PushController` that may decide to postpone them if
// a push_promise has not been received for the stream.
//
// `PushStream` is responsible for removing itself from the `PushController`.
//
// `PushStream` may be reset from the peer in the same way as a request stream. The `PushStream` informs the
// `PushController` that will set the push state to closed and remove any push events.

#[derive(Debug)]
pub(crate) struct PushStream {
    state: PushStreamState,
    stream_id: u64,
    push_handler: Rc<RefCell<PushController>>,
    events: Http3ClientEvents,
}

impl PushStream {
    pub fn new(
        stream_id: u64,
        push_handler: Rc<RefCell<PushController>>,
        events: Http3ClientEvents,
    ) -> Self {
        Self {
            state: PushStreamState::ReadPushId(NewStreamTypeReader::new()),
            stream_id,
            push_handler,
            events,
        }
    }
}

impl Display for PushStream {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Push stream {:?}", self.stream_id)
    }
}

impl RecvStream for PushStream {
    fn receive(&mut self, conn: &mut Connection, decoder: &mut QPackDecoder) -> Res<()> {
        loop {
            match &mut self.state {
                PushStreamState::ReadPushId(id_reader) => {
                    let push_id = id_reader.get_type(conn, self.stream_id);
                    let fin = id_reader.fin();
                    if fin {
                        self.state = PushStreamState::Closed;
                        return Ok(());
                    }
                    if let Some(p) = push_id {
                        if self
                            .push_handler
                            .borrow_mut()
                            .add_new_push_stream(p, self.stream_id)?
                        {
                            self.state = PushStreamState::ReadResponse {
                                push_id: p,
                                response: RecvMessage::new(
                                    MessageType::Response,
                                    self.stream_id,
                                    Box::new(RecvPushEvents::new(p, self.push_handler.clone())),
                                    None,
                                ),
                            };
                        } else {
                            let _ = conn.stream_stop_sending(
                                self.stream_id,
                                Error::HttpRequestCancelled.code(),
                            );
                            self.state = PushStreamState::Closed;
                            return Ok(());
                        }
                    }
                }
                PushStreamState::ReadResponse { response, push_id } => {
                    response.receive(conn, decoder)?;
                    if response.done() {
                        self.push_handler.borrow_mut().close(*push_id);
                        self.state = PushStreamState::Closed;
                    }
                    return Ok(());
                }
                _ => return Ok(()),
            }
        }
    }

    fn header_unblocked(&mut self, conn: &mut Connection, decoder: &mut QPackDecoder) -> Res<()> {
        self.receive(conn, decoder)
    }
    fn done(&self) -> bool {
        matches!(self.state, PushStreamState::Closed)
    }

    fn stream_reset(&self, app_error: AppError, decoder: &mut QPackDecoder, reset_type: ResetType) {
        if !self.done() {
            decoder.cancel_stream(self.stream_id);
        }
        match reset_type {
            ResetType::App => {}
            t => {
                if let Some(push_id) = self.state.push_id() {
                    self.push_handler
                        .borrow_mut()
                        .push_stream_reset(push_id, app_error, t);
                }
            }
        }
    }

    fn read_data(
        &mut self,
        conn: &mut Connection,
        decoder: &mut QPackDecoder,
        buf: &mut [u8],
    ) -> Res<(usize, bool)> {
        if let PushStreamState::ReadResponse { response, push_id } = &mut self.state {
            let res = response.read_data(conn, decoder, buf);
            if response.done() {
                self.push_handler.borrow_mut().close(*push_id);
            }
            res
        } else {
            Err(Error::InvalidStreamId)
        }
    }
}