diff options
Diffstat (limited to 'third_party/rust/warp/tests/ws.rs')
-rw-r--r-- | third_party/rust/warp/tests/ws.rs | 288 |
1 files changed, 288 insertions, 0 deletions
diff --git a/third_party/rust/warp/tests/ws.rs b/third_party/rust/warp/tests/ws.rs new file mode 100644 index 0000000000..d5b60356e4 --- /dev/null +++ b/third_party/rust/warp/tests/ws.rs @@ -0,0 +1,288 @@ +#![deny(warnings)] + +use futures_util::{FutureExt, SinkExt, StreamExt}; +use serde_derive::Deserialize; +use warp::ws::Message; +use warp::Filter; + +#[tokio::test] +async fn upgrade() { + let _ = pretty_env_logger::try_init(); + + let route = warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|_| async {})); + + // From https://tools.ietf.org/html/rfc6455#section-1.2 + let key = "dGhlIHNhbXBsZSBub25jZQ=="; + let accept = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="; + + let resp = warp::test::request() + .header("connection", "upgrade") + .header("upgrade", "websocket") + .header("sec-websocket-version", "13") + .header("sec-websocket-key", key) + .reply(&route) + .await; + + assert_eq!(resp.status(), 101); + assert_eq!(resp.headers()["connection"], "upgrade"); + assert_eq!(resp.headers()["upgrade"], "websocket"); + assert_eq!(resp.headers()["sec-websocket-accept"], accept); + + let resp = warp::test::request() + .header("connection", "keep-alive, Upgrade") + .header("upgrade", "Websocket") + .header("sec-websocket-version", "13") + .header("sec-websocket-key", key) + .reply(&route) + .await; + + assert_eq!(resp.status(), 101); +} + +#[tokio::test] +async fn fail() { + let _ = pretty_env_logger::try_init(); + + let route = warp::any().map(warp::reply); + + warp::test::ws() + .handshake(route) + .await + .expect_err("handshake non-websocket route should fail"); +} + +#[tokio::test] +async fn text() { + let _ = pretty_env_logger::try_init(); + + let mut client = warp::test::ws() + .handshake(ws_echo()) + .await + .expect("handshake"); + + client.send_text("hello warp").await; + + let msg = client.recv().await.expect("recv"); + assert_eq!(msg.to_str(), Ok("hello warp")); +} + +#[tokio::test] +async fn binary() { + let _ = pretty_env_logger::try_init(); + + let mut client = warp::test::ws() + .handshake(ws_echo()) + .await + .expect("handshake"); + + client.send(warp::ws::Message::binary(&b"bonk"[..])).await; + let msg = client.recv().await.expect("recv"); + assert!(msg.is_binary()); + assert_eq!(msg.as_bytes(), &b"bonk"[..]); +} + +#[tokio::test] +async fn wsclient_sink_and_stream() { + let _ = pretty_env_logger::try_init(); + + let mut client = warp::test::ws() + .handshake(ws_echo()) + .await + .expect("handshake"); + + let message = warp::ws::Message::text("hello"); + SinkExt::send(&mut client, message.clone()).await.unwrap(); + let received_message = client.next().await.unwrap().unwrap(); + assert_eq!(message, received_message); +} + +#[tokio::test] +async fn close_frame() { + let _ = pretty_env_logger::try_init(); + + let route = warp::ws().map(|ws: warp::ws::Ws| { + ws.on_upgrade(|mut websocket| async move { + let msg = websocket.next().await.expect("item").expect("ok"); + let _ = msg.close_frame().expect("close frame"); + }) + }); + + let client = warp::test::ws().handshake(route).await.expect("handshake"); + drop(client); +} + +#[tokio::test] +async fn send_ping() { + let _ = pretty_env_logger::try_init(); + + let filter = warp::ws().map(|ws: warp::ws::Ws| { + ws.on_upgrade(|mut websocket| { + async move { + websocket.send(Message::ping("srv")).await.unwrap(); + // assume the client will pong back + let msg = websocket.next().await.expect("item").expect("ok"); + assert!(msg.is_pong()); + assert_eq!(msg.as_bytes(), &b"srv"[..]); + } + }) + }); + + let mut client = warp::test::ws().handshake(filter).await.expect("handshake"); + + let msg = client.recv().await.expect("recv"); + assert!(msg.is_ping()); + assert_eq!(msg.as_bytes(), &b"srv"[..]); + + client.recv_closed().await.expect("closed"); +} + +#[tokio::test] +async fn echo_pings() { + let _ = pretty_env_logger::try_init(); + + let mut client = warp::test::ws() + .handshake(ws_echo()) + .await + .expect("handshake"); + + client.send(Message::ping("clt")).await; + + // tungstenite sends the PONG first + let msg = client.recv().await.expect("recv"); + assert!(msg.is_pong()); + assert_eq!(msg.as_bytes(), &b"clt"[..]); + + // and then `ws_echo` sends us back the same PING + let msg = client.recv().await.expect("recv"); + assert!(msg.is_ping()); + assert_eq!(msg.as_bytes(), &b"clt"[..]); + + // and then our client would have sent *its* PONG + // and `ws_echo` would send *that* back too + let msg = client.recv().await.expect("recv"); + assert!(msg.is_pong()); + assert_eq!(msg.as_bytes(), &b"clt"[..]); +} + +#[tokio::test] +async fn pongs_only() { + let _ = pretty_env_logger::try_init(); + + let mut client = warp::test::ws() + .handshake(ws_echo()) + .await + .expect("handshake"); + + // construct a pong message and make sure it is correct + let msg = Message::pong("clt"); + assert!(msg.is_pong()); + assert_eq!(msg.as_bytes(), &b"clt"[..]); + + // send it to echo and wait for `ws_echo` to send it back + client.send(msg).await; + + let msg = client.recv().await.expect("recv"); + assert!(msg.is_pong()); + assert_eq!(msg.as_bytes(), &b"clt"[..]); +} + +#[tokio::test] +async fn closed() { + let _ = pretty_env_logger::try_init(); + + let route = + warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|websocket| websocket.close().map(|_| ()))); + + let mut client = warp::test::ws().handshake(route).await.expect("handshake"); + + client.recv_closed().await.expect("closed"); +} + +#[tokio::test] +async fn limit_message_size() { + let _ = pretty_env_logger::try_init(); + + let echo = warp::ws().map(|ws: warp::ws::Ws| { + ws.max_message_size(1024).on_upgrade(|websocket| { + // Just echo all messages back... + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + assert!(result.is_err()); + assert_eq!( + format!("{}", result.unwrap_err()).as_str(), + "Space limit exceeded: Message too big: 0 + 1025 > 1024" + ); + }) + }) + }); + let mut client = warp::test::ws().handshake(echo).await.expect("handshake"); + + client.send(warp::ws::Message::binary(vec![0; 1025])).await; + client.send_text("hello warp").await; + assert!(client.recv().await.is_err()); +} + +#[tokio::test] +async fn limit_frame_size() { + let _ = pretty_env_logger::try_init(); + + let echo = warp::ws().map(|ws: warp::ws::Ws| { + ws.max_frame_size(1024).on_upgrade(|websocket| { + // Just echo all messages back... + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + assert!(result.is_err()); + assert_eq!( + format!("{}", result.unwrap_err()).as_str(), + "Space limit exceeded: Message length too big: 1025 > 1024" + ); + }) + }) + }); + let mut client = warp::test::ws().handshake(echo).await.expect("handshake"); + + client.send(warp::ws::Message::binary(vec![0; 1025])).await; + client.send_text("hello warp").await; + assert!(client.recv().await.is_err()); +} + +#[derive(Deserialize)] +struct MyQuery { + hello: String, +} + +#[tokio::test] +async fn ws_with_query() { + let ws_filter = warp::path("my-ws") + .and(warp::query::<MyQuery>()) + .and(warp::ws()) + .map(|query: MyQuery, ws: warp::ws::Ws| { + assert_eq!(query.hello, "world"); + + ws.on_upgrade(|websocket| { + let (tx, rx) = websocket.split(); + rx.inspect(|i| log::debug!("ws recv: {:?}", i)) + .forward(tx) + .map(|_| ()) + }) + }); + + warp::test::ws() + .path("/my-ws?hello=world") + .handshake(ws_filter) + .await + .expect("handshake"); +} + +// Websocket filter that echoes all messages back. +fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Copy { + warp::ws().map(|ws: warp::ws::Ws| { + ws.on_upgrade(|websocket| { + // Just echo all messages back... + let (tx, rx) = websocket.split(); + rx.inspect(|i| log::debug!("ws recv: {:?}", i)) + .forward(tx) + .map(|_| ()) + }) + }) +} |