diff options
Diffstat (limited to '')
-rw-r--r-- | netwerk/test/http3server/src/main.rs | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/netwerk/test/http3server/src/main.rs b/netwerk/test/http3server/src/main.rs new file mode 100644 index 0000000000..ae953e6ec0 --- /dev/null +++ b/netwerk/test/http3server/src/main.rs @@ -0,0 +1,620 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![deny(warnings)] + +use neqo_common::{event::Provider, qdebug, qinfo, qtrace, Datagram}; +use neqo_crypto::{init_db, AllowZeroRtt, AntiReplay}; +use neqo_http3::{Error, Http3Server, Http3ServerEvent}; +use neqo_qpack::QpackSettings; +use neqo_transport::server::Server; +use neqo_transport::{ConnectionEvent, ConnectionParameters, FixedConnectionIdManager, Output}; +use std::env; + +use std::cell::RefCell; +use std::io; +use std::path::PathBuf; +use std::process::exit; +use std::rc::Rc; +use std::thread; +use std::time::{Duration, Instant}; + +use core::fmt::Display; +use mio::net::UdpSocket; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio_extras::timer::{Builder, Timeout, Timer}; +use std::collections::HashMap; +use std::collections::HashSet; +use std::mem; +use std::net::SocketAddr; + +const MAX_TABLE_SIZE: u64 = 65536; +const MAX_BLOCKED_STREAMS: u16 = 10; +const PROTOCOLS: &[&str] = &["h3-27"]; +const TIMER_TOKEN: Token = Token(0xffff_ffff); + +const HTTP_RESPONSE_WITH_WRONG_FRAME: &[u8] = &[ + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // headers + 0x0, 0x3, 0x61, 0x62, 0x63, // the first data frame + 0x3, 0x1, 0x5, // a cancel push frame that is not allowed +]; + +trait HttpServer: Display { + fn process(&mut self, dgram: Option<Datagram>) -> Output; + fn process_events(&mut self); +} + +struct Http3TestServer { + server: Http3Server, + // This a map from a post request to amount of data ithas been received on the request. + // The respons will carry the amount of data received. + posts: HashMap<String, usize>, +} + +impl ::std::fmt::Display for Http3TestServer { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", self.server) + } +} + +impl Http3TestServer { + pub fn new(server: Http3Server) -> Self { + Self { + server, + posts: HashMap::new(), + } + } +} + +impl HttpServer for Http3TestServer { + fn process(&mut self, dgram: Option<Datagram>) -> Output { + self.server.process(dgram, Instant::now()) + } + + fn process_events(&mut self) { + while let Some(event) = self.server.next_event() { + qtrace!("Event: {:?}", event); + match event { + Http3ServerEvent::Headers { + mut request, + headers, + fin, + } => { + qtrace!("Headers (request={} fin={}): {:?}", request, fin, headers); + + let default_ret = b"Hello World".to_vec(); + let default_headers = vec![ + (String::from(":status"), String::from("200")), + (String::from("cache-control"), String::from("no-cache")), + ( + String::from("content-length"), + default_ret.len().to_string(), + ), + ]; + + let path_hdr = headers.iter().find(|(k, _)| k == ":path"); + match path_hdr { + Some((_, path)) if !path.is_empty() => { + qtrace!("Serve request {}", path); + if path == "/Response421" { + let response_body = b"0123456789".to_vec(); + request + .set_response( + &[ + (String::from(":status"), String::from("421")), + ( + String::from("cache-control"), + String::from("no-cache"), + ), + ( + String::from("content-length"), + response_body.len().to_string(), + ), + ], + &response_body, + ) + .unwrap(); + } else if path == "/RequestCancelled" { + request + .stream_reset(Error::HttpRequestCancelled.code()) + .unwrap(); + } else if path == "/VersionFallback" { + request + .stream_reset(Error::HttpVersionFallback.code()) + .unwrap(); + } else if path == "/EarlyResponse" { + request.stream_reset(Error::HttpNoError.code()).unwrap(); + } else if path == "/RequestRejected" { + request + .stream_reset(Error::HttpRequestRejected.code()) + .unwrap(); + } else if path == "/.well-known/http-opportunistic" { + let host_hdr = headers.iter().find(|(k, _)| k == ":authority"); + match host_hdr { + Some((_, host)) if !host.is_empty() => { + let mut content = b"[\"http://".to_vec(); + content.extend(host.as_bytes()); + content.extend(b"\"]".to_vec()); + request + .set_response( + &[ + (String::from(":status"), String::from("200")), + ( + String::from("cache-control"), + String::from("no-cache"), + ), + ( + String::from("content-type"), + String::from("application/json"), + ), + ( + String::from("content-length"), + content.len().to_string(), + ), + ], + &content, + ) + .unwrap(); + } + _ => request + .set_response(&default_headers, &default_ret) + .unwrap(), + } + } else if path == "/no_body" { + request + .set_response( + &[ + (String::from(":status"), String::from("200")), + ( + String::from("cache-control"), + String::from("no-cache"), + ), + ], + &[], + ) + .unwrap(); + } else if path == "/no_content_length" { + request + .set_response( + &[ + (String::from(":status"), String::from("200")), + ( + String::from("cache-control"), + String::from("no-cache"), + ), + ], + &vec![b'a'; 4000], + ) + .unwrap(); + } else if path == "/content_length_smaller" { + request + .set_response( + &[ + (String::from(":status"), String::from("200")), + ( + String::from("cache-control"), + String::from("no-cache"), + ), + (String::from("content-length"), 4000.to_string()), + ], + &vec![b'a'; 8000], + ) + .unwrap(); + } else if path == "/post" { + // Read all data before responding. + self.posts.insert(format!("{}", request), 0); + } else { + match path.trim_matches(|p| p == '/').parse::<usize>() { + Ok(v) => request + .set_response( + &[ + (String::from(":status"), String::from("200")), + ( + String::from("cache-control"), + String::from("no-cache"), + ), + (String::from("content-length"), v.to_string()), + ], + &vec![b'a'; v], + ) + .unwrap(), + Err(_) => request + .set_response(&default_headers, &default_ret) + .unwrap(), + } + } + } + _ => { + request + .set_response(&default_headers, &default_ret) + .unwrap(); + } + } + } + Http3ServerEvent::Data { + mut request, + data, + fin, + } => { + if let Some(r) = self.posts.get_mut(&format!("{}", request)) { + *r += data.len(); + } + if fin { + if let Some(r) = self.posts.remove(&format!("{}", request)) { + let default_ret = b"Hello World".to_vec(); + request + .set_response( + &[ + (String::from(":status"), String::from("200")), + (String::from("cache-control"), String::from("no-cache")), + (String::from("x-data-received-length"), r.to_string()), + ( + String::from("content-length"), + default_ret.len().to_string(), + ), + ], + &default_ret, + ) + .unwrap(); + } + } + } + _ => {} + } + } + } +} + +impl HttpServer for Server { + fn process(&mut self, dgram: Option<Datagram>) -> Output { + self.process(dgram, Instant::now()) + } + + fn process_events(&mut self) { + let active_conns = self.active_connections(); + for mut acr in active_conns { + loop { + let event = match acr.borrow_mut().next_event() { + None => break, + Some(e) => e, + }; + match event { + ConnectionEvent::RecvStreamReadable { stream_id } => { + if stream_id % 4 == 0 { + // We are only interesting in request streams + acr.borrow_mut() + .stream_send(stream_id, HTTP_RESPONSE_WITH_WRONG_FRAME) + .expect("Read should succeed"); + } + } + _ => {} + } + } + } + } +} + +#[derive(Default)] +struct NonRespondingServer {} + +impl ::std::fmt::Display for NonRespondingServer { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "NonRespondingServer") + } +} + +impl HttpServer for NonRespondingServer { + fn process(&mut self, _dgram: Option<Datagram>) -> Output { + Output::None + } + + fn process_events(&mut self) {} +} + +fn emit_packet(socket: &UdpSocket, out_dgram: Datagram) { + let sent = socket + .send_to(&out_dgram, &out_dgram.destination()) + .expect("Error sending datagram"); + if sent != out_dgram.len() { + qinfo!("Unable to send all {} bytes of datagram", out_dgram.len()); + } +} + +fn process( + server: &mut dyn HttpServer, + svr_timeout: &mut Option<Timeout>, + inx: usize, + dgram: Option<Datagram>, + timer: &mut Timer<usize>, + socket: &mut UdpSocket, +) -> bool { + match server.process(dgram) { + Output::Datagram(dgram) => { + emit_packet(socket, dgram); + true + } + Output::Callback(new_timeout) => { + if let Some(svr_timeout) = svr_timeout { + timer.cancel_timeout(svr_timeout); + } + + qinfo!("Setting timeout of {:?} for {}", new_timeout, server); + *svr_timeout = Some(timer.set_timeout(new_timeout, inx)); + false + } + Output::None => { + qdebug!("Output::None"); + false + } + } +} + +fn read_dgram( + socket: &mut UdpSocket, + local_address: &SocketAddr, +) -> Result<Option<Datagram>, io::Error> { + let buf = &mut [0u8; 2048]; + let (sz, remote_addr) = match socket.recv_from(&mut buf[..]) { + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(None), + Err(err) => { + eprintln!("UDP recv error: {:?}", err); + return Err(err); + } + Ok(res) => res, + }; + + if sz == buf.len() { + eprintln!("Might have received more than {} bytes", buf.len()); + } + + if sz == 0 { + eprintln!("zero length datagram received?"); + Ok(None) + } else { + Ok(Some(Datagram::new(remote_addr, *local_address, &buf[..sz]))) + } +} + +enum ServerType { + Http3, + Http3Fail, + Http3NoResponse, +} + +struct ServersRunner { + hosts: Vec<SocketAddr>, + poll: Poll, + sockets: Vec<UdpSocket>, + servers: HashMap<SocketAddr, (Box<dyn HttpServer>, Option<Timeout>)>, + timer: Timer<usize>, + active_servers: HashSet<usize>, +} + +impl ServersRunner { + pub fn new() -> Result<Self, io::Error> { + Ok(Self { + hosts: Vec::new(), + poll: Poll::new()?, + sockets: Vec::new(), + servers: HashMap::new(), + timer: Builder::default() + .tick_duration(Duration::from_millis(1)) + .build::<usize>(), + active_servers: HashSet::new(), + }) + } + + pub fn init(&mut self) { + self.add_new_socket(0, ServerType::Http3); + self.add_new_socket(1, ServerType::Http3Fail); + self.add_new_socket(3, ServerType::Http3NoResponse); + println!( + "HTTP3 server listening on ports {}, {} and {}", + self.hosts[0].port(), + self.hosts[1].port(), + self.hosts[2].port() + ); + self.poll + .register(&self.timer, TIMER_TOKEN, Ready::readable(), PollOpt::edge()) + .unwrap(); + } + + fn add_new_socket(&mut self, count: usize, server_type: ServerType) -> u16 { + let addr = "127.0.0.1:0".parse().unwrap(); + + let socket = match UdpSocket::bind(&addr) { + Err(err) => { + eprintln!("Unable to bind UDP socket: {}", err); + exit(1) + } + Ok(s) => s, + }; + + let local_addr = match socket.local_addr() { + Err(err) => { + eprintln!("Socket local address not bound: {}", err); + exit(1) + } + Ok(s) => s, + }; + + self.hosts.push(local_addr); + + self.poll + .register( + &socket, + Token(count), + Ready::readable() | Ready::writable(), + PollOpt::edge(), + ) + .unwrap(); + + self.sockets.push(socket); + self.servers + .insert(local_addr, (self.create_server(server_type), None)); + local_addr.port() + } + + fn create_server(&self, server_type: ServerType) -> Box<dyn HttpServer> { + let anti_replay = AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14) + .expect("unable to setup anti-replay"); + let cid_mgr = Rc::new(RefCell::new(FixedConnectionIdManager::new(10))); + + match server_type { + ServerType::Http3 => Box::new(Http3TestServer::new( + Http3Server::new( + Instant::now(), + &[" HTTP2 Test Cert"], + PROTOCOLS, + anti_replay, + cid_mgr, + QpackSettings { + max_table_size_encoder: MAX_TABLE_SIZE, + max_table_size_decoder: MAX_TABLE_SIZE, + max_blocked_streams: MAX_BLOCKED_STREAMS, + }, + ) + .expect("We cannot make a server!"), + )), + ServerType::Http3Fail => Box::new( + Server::new( + Instant::now(), + &[" HTTP2 Test Cert"], + PROTOCOLS, + anti_replay, + Box::new(AllowZeroRtt {}), + cid_mgr, + ConnectionParameters::default(), + ) + .expect("We cannot make a server!"), + ), + ServerType::Http3NoResponse => Box::new(NonRespondingServer::default()), + } + } + + fn process_datagrams_and_events( + &mut self, + inx: usize, + read_socket: bool, + ) -> Result<(), io::Error> { + if let Some(socket) = self.sockets.get_mut(inx) { + if let Some((ref mut server, svr_timeout)) = + self.servers.get_mut(&socket.local_addr().unwrap()) + { + if read_socket { + loop { + let dgram = read_dgram(socket, &self.hosts[inx])?; + if dgram.is_none() { + break; + } + let _ = process( + &mut **server, + svr_timeout, + inx, + dgram, + &mut self.timer, + socket, + ); + } + } else { + let _ = process( + &mut **server, + svr_timeout, + inx, + None, + &mut self.timer, + socket, + ); + } + server.process_events(); + if process( + &mut **server, + svr_timeout, + inx, + None, + &mut self.timer, + socket, + ) { + self.active_servers.insert(inx); + } + } + } + Ok(()) + } + + fn process_active_conns(&mut self) -> Result<(), io::Error> { + let curr_active = mem::take(&mut self.active_servers); + for inx in curr_active { + self.process_datagrams_and_events(inx, false)?; + } + Ok(()) + } + + fn process_timeout(&mut self) -> Result<(), io::Error> { + while let Some(inx) = self.timer.poll() { + qinfo!("Timer expired for {:?}", inx); + self.process_datagrams_and_events(inx, false)?; + } + Ok(()) + } + + pub fn run(&mut self) -> Result<(), io::Error> { + let mut events = Events::with_capacity(1024); + loop { + // If there are active servers do not block in poll. + self.poll.poll( + &mut events, + if self.active_servers.is_empty() { + None + } else { + Some(Duration::from_millis(0)) + }, + )?; + + for event in &events { + if event.token() == TIMER_TOKEN { + self.process_timeout()?; + } else { + if !event.readiness().is_readable() { + continue; + } + self.process_datagrams_and_events(event.token().0, true)?; + } + } + self.process_active_conns()?; + } + } +} + +fn main() -> Result<(), io::Error> { + let args: Vec<String> = env::args().collect(); + if args.len() < 2 { + eprintln!("Wrong arguments."); + exit(1) + } + + // Read data from stdin and terminate the server if EOF is detected, which + // means that runxpcshelltests.py ended without shutting down the server. + thread::spawn(|| loop { + let mut buffer = String::new(); + match io::stdin().read_line(&mut buffer) { + Ok(n) => { + if n == 0 { + exit(0); + } + } + Err(_) => { + exit(0); + } + } + }); + + init_db(PathBuf::from(args[1].clone())); + + let mut servers_runner = ServersRunner::new()?; + servers_runner.init(); + servers_runner.run() +} |