1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![allow(clippy::module_name_repetitions)]
use crate::connection::Http3State;
use crate::connection_server::Http3ServerHandler;
use crate::{Header, Res};
use neqo_common::{qdebug, qinfo};
use neqo_transport::server::ActiveConnectionRef;
use neqo_transport::{AppError, Connection};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
#[derive(Debug, Clone)]
pub struct ClientRequestStream {
conn: ActiveConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_id: u64,
}
impl ::std::fmt::Display for ClientRequestStream {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
let conn: &Connection = &self.conn.borrow();
write!(
f,
"Http3 server conn={:?} stream_id={}",
conn, self.stream_id
)
}
}
impl ClientRequestStream {
pub(crate) fn new(
conn: ActiveConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_id: u64,
) -> Self {
Self {
conn,
handler,
stream_id,
}
}
/// Supply a response to a request.
pub fn set_response(&mut self, headers: &[Header], data: &[u8]) -> Res<()> {
qinfo!([self], "Set new response.");
self.handler
.borrow_mut()
.set_response(self.stream_id, headers, data)
}
/// Request a peer to stop sending a request.
pub fn stream_stop_sending(&mut self, app_error: AppError) -> Res<()> {
qdebug!(
[self],
"stop sending stream_id:{} error:{}.",
self.stream_id,
app_error
);
self.conn
.borrow_mut()
.stream_stop_sending(self.stream_id, app_error)?;
Ok(())
}
/// Reset a stream/request.
pub fn stream_reset(&mut self, app_error: AppError) -> Res<()> {
qdebug!([self], "reset error:{}.", app_error);
self.handler.borrow_mut().stream_reset(
&mut self.conn.borrow_mut(),
self.stream_id,
app_error,
)
}
}
#[derive(Debug, Clone)]
pub enum Http3ServerEvent {
/// Headers are ready.
Headers {
request: ClientRequestStream,
headers: Vec<Header>,
fin: bool,
},
/// Request data is ready.
Data {
request: ClientRequestStream,
data: Vec<u8>,
fin: bool,
},
/// When individual connection change state. It is only used for tests.
StateChange {
conn: ActiveConnectionRef,
state: Http3State,
},
}
#[derive(Debug, Default, Clone)]
pub struct Http3ServerEvents {
events: Rc<RefCell<VecDeque<Http3ServerEvent>>>,
}
impl Http3ServerEvents {
fn insert(&self, event: Http3ServerEvent) {
self.events.borrow_mut().push_back(event);
}
/// Take all events
pub fn events(&self) -> impl Iterator<Item = Http3ServerEvent> {
self.events.replace(VecDeque::new()).into_iter()
}
/// Whether there is request pending.
pub fn has_events(&self) -> bool {
!self.events.borrow().is_empty()
}
/// Take the next event if present.
pub fn next_event(&self) -> Option<Http3ServerEvent> {
self.events.borrow_mut().pop_front()
}
/// Insert a `Headers` event.
pub(crate) fn headers(&self, request: ClientRequestStream, headers: Vec<Header>, fin: bool) {
self.insert(Http3ServerEvent::Headers {
request,
headers,
fin,
});
}
/// Insert a `StateChange` event.
pub(crate) fn connection_state_change(&self, conn: ActiveConnectionRef, state: Http3State) {
self.insert(Http3ServerEvent::StateChange { conn, state });
}
/// Insert a `Data` event.
pub(crate) fn data(&self, request: ClientRequestStream, data: Vec<u8>, fin: bool) {
self.insert(Http3ServerEvent::Data { request, data, fin });
}
}
|