diff options
Diffstat (limited to 'third_party/rust/warp/examples')
29 files changed, 1615 insertions, 0 deletions
diff --git a/third_party/rust/warp/examples/README.md b/third_party/rust/warp/examples/README.md new file mode 100644 index 0000000000..1b3bb8a2ae --- /dev/null +++ b/third_party/rust/warp/examples/README.md @@ -0,0 +1,68 @@ +# Examples + +Welcome to the examples! These show off `warp`'s functionality and explain how to use it. + +## Getting Started + +To get started, run `examples/hello.rs` with: + +```bash +> cargo run --example hello +``` + +This will start a simple "hello world" service running on your localhost port 3030. + +Open another terminal and run: + +```bash +> curl http://localhost:3030/hi +Hello, World!% +``` + +Congratulations, you have just run your first warp service! + +You can run other examples with `cargo run --example [example name]`: + +- [`hello.rs`](./hello.rs) - Just a basic "Hello World" API +- [`routing.rs`](./routing.rs) - Builds up a more complex set of routes and shows how to combine filters +- [`body.rs`](./body.rs) - What's a good API without parsing data from the request body? +- [`headers.rs`](./headers.rs) - Parsing data from the request headers +- [`rejections.rs`](./rejections.rs) - Your APIs are obviously perfect, but for silly others who call them incorrectly you'll want to define errors for them +- [`futures.rs`](./futures.rs) - Wait, wait! ... Or how to integrate futures into filters +- [`todos.rs`](./todos.rs) - Putting this all together with a proper app + +## Further Use Cases + +### Serving HTML and Other Files + +- [`file.rs`](./file.rs) - Serving static files +- [`dir.rs`](./dir.rs) - Or a whole directory of files +- [`handlebars_template.rs`](./handlebars_template.rs) - Using Handlebars to fill in an HTML template + +### Websockets + +Hooray! `warp` also includes built-in support for WebSockets + +- [`websockets.rs`](./websockets.rs) - Basic handling of a WebSocket upgrade +- [`websockets_chat.rs`](./websockets_chat.rs) - Full WebSocket app + +### Server-Side Events + +- [`sse.rs`](./sse.rs) - Basic Server-Side Event +- [`sse_chat.rs`](./sse_chat.rs) - Full SSE app + +### TLS + +- [`tls.rs`](./tls.rs) - can i haz security? + +### Autoreloading + +- [`autoreload.rs`](./autoreload.rs) - Change some code and watch the server reload automatically! + +### Debugging + +- [`tracing.rs`](./tracing.rs) - Warp has built-in support for rich diagnostics with [`tracing`](https://docs.rs/tracing)! + +## Custom HTTP Methods + +- [`custom_methods.rs`](./custom_methods.rs) - It is also possible to use Warp with custom HTTP methods. diff --git a/third_party/rust/warp/examples/autoreload.rs b/third_party/rust/warp/examples/autoreload.rs new file mode 100644 index 0000000000..a21d9b1369 --- /dev/null +++ b/third_party/rust/warp/examples/autoreload.rs @@ -0,0 +1,42 @@ +#![deny(warnings)] +use hyper::server::Server; +use listenfd::ListenFd; +use std::convert::Infallible; +use warp::Filter; + +/// You'll need to install `systemfd` and `cargo-watch`: +/// ``` +/// cargo install systemfd cargo-watch +/// ``` +/// And run with: +/// ``` +/// systemfd --no-pid -s http::3030 -- cargo watch -x 'run --example autoreload' +/// ``` +#[tokio::main] +async fn main() { + // Match any request and return hello world! + let routes = warp::any().map(|| "Hello, World!"); + + // hyper let's us build a server from a TcpListener (which will be + // useful shortly). Thus, we'll need to convert our `warp::Filter` into + // a `hyper::service::MakeService` for use with a `hyper::server::Server`. + let svc = warp::service(routes); + + let make_svc = hyper::service::make_service_fn(|_: _| { + // the clone is there because not all warp filters impl Copy + let svc = svc.clone(); + async move { Ok::<_, Infallible>(svc) } + }); + + let mut listenfd = ListenFd::from_env(); + // if listenfd doesn't take a TcpListener (i.e. we're not running via + // the command above), we fall back to explicitly binding to a given + // host:port. + let server = if let Some(l) = listenfd.take_tcp_listener(0).unwrap() { + Server::from_tcp(l).unwrap() + } else { + Server::bind(&([127, 0, 0, 1], 3030).into()) + }; + + server.serve(make_svc).await.unwrap(); +} diff --git a/third_party/rust/warp/examples/body.rs b/third_party/rust/warp/examples/body.rs new file mode 100644 index 0000000000..174d928ab8 --- /dev/null +++ b/third_party/rust/warp/examples/body.rs @@ -0,0 +1,30 @@ +#![deny(warnings)] + +use serde_derive::{Deserialize, Serialize}; + +use warp::Filter; + +#[derive(Deserialize, Serialize)] +struct Employee { + name: String, + rate: u32, +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // POST /employees/:rate {"name":"Sean","rate":2} + let promote = warp::post() + .and(warp::path("employees")) + .and(warp::path::param::<u32>()) + // Only accept bodies smaller than 16kb... + .and(warp::body::content_length_limit(1024 * 16)) + .and(warp::body::json()) + .map(|rate, mut employee: Employee| { + employee.rate = rate; + warp::reply::json(&employee) + }); + + warp::serve(promote).run(([127, 0, 0, 1], 3030)).await +} diff --git a/third_party/rust/warp/examples/compression.rs b/third_party/rust/warp/examples/compression.rs new file mode 100644 index 0000000000..1a52c7a7d0 --- /dev/null +++ b/third_party/rust/warp/examples/compression.rs @@ -0,0 +1,34 @@ +#![deny(warnings)] + +use warp::Filter; + +#[tokio::main] +async fn main() { + let file = warp::path("todos").and(warp::fs::file("./examples/todos.rs")); + // NOTE: You could double compress something by adding a compression + // filter here, a la + // ``` + // let file = warp::path("todos") + // .and(warp::fs::file("./examples/todos.rs")) + // .with(warp::compression::brotli()); + // ``` + // This would result in a browser error, or downloading a file whose contents + // are compressed + + let dir = warp::path("ws_chat").and(warp::fs::file("./examples/websockets_chat.rs")); + + let file_and_dir = warp::get() + .and(file.or(dir)) + .with(warp::compression::gzip()); + + let examples = warp::path("ex") + .and(warp::fs::dir("./examples/")) + .with(warp::compression::deflate()); + + // GET /todos => gzip -> toods.rs + // GET /ws_chat => gzip -> ws_chat.rs + // GET /ex/... => deflate -> ./examples/... + let routes = file_and_dir.or(examples); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/custom_methods.rs b/third_party/rust/warp/examples/custom_methods.rs new file mode 100644 index 0000000000..2e26604132 --- /dev/null +++ b/third_party/rust/warp/examples/custom_methods.rs @@ -0,0 +1,61 @@ +#![deny(warnings)] +use std::net::SocketAddr; + +use warp::hyper::StatusCode; +use warp::{hyper::Method, reject, Filter, Rejection, Reply}; + +#[derive(Debug)] +struct MethodError; +impl reject::Reject for MethodError {} + +const FOO_METHOD: &'static str = "FOO"; +const BAR_METHOD: &'static str = "BAR"; + +fn method(name: &'static str) -> impl Filter<Extract = (), Error = Rejection> + Clone { + warp::method() + .and_then(move |m: Method| async move { + if m == name { + Ok(()) + } else { + Err(reject::custom(MethodError)) + } + }) + .untuple_one() +} + +pub async fn handle_not_found(reject: Rejection) -> Result<impl Reply, Rejection> { + if reject.is_not_found() { + Ok(StatusCode::NOT_FOUND) + } else { + Err(reject) + } +} + +pub async fn handle_custom(reject: Rejection) -> Result<impl Reply, Rejection> { + if reject.find::<MethodError>().is_some() { + Ok(StatusCode::METHOD_NOT_ALLOWED) + } else { + Err(reject) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let address: SocketAddr = "[::]:3030".parse()?; + + let foo_route = method(FOO_METHOD) + .and(warp::path!("foo")) + .map(|| "Success") + .recover(handle_not_found); + + let bar_route = method(BAR_METHOD) + .and(warp::path!("bar")) + .map(|| "Success") + .recover(handle_not_found); + + warp::serve(foo_route.or(bar_route).recover(handle_custom)) + .run(address) + .await; + + Ok(()) +} diff --git a/third_party/rust/warp/examples/dir.rs b/third_party/rust/warp/examples/dir.rs new file mode 100644 index 0000000000..30261a220e --- /dev/null +++ b/third_party/rust/warp/examples/dir.rs @@ -0,0 +1,10 @@ +#![deny(warnings)] + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + warp::serve(warp::fs::dir("examples/dir")) + .run(([127, 0, 0, 1], 3030)) + .await; +} diff --git a/third_party/rust/warp/examples/dir/another.html b/third_party/rust/warp/examples/dir/another.html new file mode 100644 index 0000000000..941c9a5937 --- /dev/null +++ b/third_party/rust/warp/examples/dir/another.html @@ -0,0 +1,10 @@ +<!DOCTYPE html> +<html> + <head> + <title>dir/another.html</title> + </head> + <body> + <h1>Welcome to Another Page</h1> + <a href="/">back</a> + </body> +</html>
\ No newline at end of file diff --git a/third_party/rust/warp/examples/dir/index.html b/third_party/rust/warp/examples/dir/index.html new file mode 100644 index 0000000000..cb86323446 --- /dev/null +++ b/third_party/rust/warp/examples/dir/index.html @@ -0,0 +1,10 @@ +<!DOCTYPE html> +<html> + <head> + <title>dir/index.html</title> + </head> + <body> + <h1>Welcome to Dir</h1> + <a href="/another.html">another page</a> + </body> +</html>
\ No newline at end of file diff --git a/third_party/rust/warp/examples/dyn_reply.rs b/third_party/rust/warp/examples/dyn_reply.rs new file mode 100644 index 0000000000..4f59cf8ba9 --- /dev/null +++ b/third_party/rust/warp/examples/dyn_reply.rs @@ -0,0 +1,17 @@ +#![deny(warnings)] +use warp::{http::StatusCode, Filter}; + +async fn dyn_reply(word: String) -> Result<Box<dyn warp::Reply>, warp::Rejection> { + if &word == "hello" { + Ok(Box::new("world")) + } else { + Ok(Box::new(StatusCode::BAD_REQUEST)) + } +} + +#[tokio::main] +async fn main() { + let routes = warp::path::param().and_then(dyn_reply); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/file.rs b/third_party/rust/warp/examples/file.rs new file mode 100644 index 0000000000..a0cf2afa45 --- /dev/null +++ b/third_party/rust/warp/examples/file.rs @@ -0,0 +1,21 @@ +#![deny(warnings)] + +use warp::Filter; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let readme = warp::get() + .and(warp::path::end()) + .and(warp::fs::file("./README.md")); + + // dir already requires GET... + let examples = warp::path("ex").and(warp::fs::dir("./examples/")); + + // GET / => README.md + // GET /ex/... => ./examples/.. + let routes = readme.or(examples); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/futures.rs b/third_party/rust/warp/examples/futures.rs new file mode 100644 index 0000000000..43bf2f6efa --- /dev/null +++ b/third_party/rust/warp/examples/futures.rs @@ -0,0 +1,37 @@ +#![deny(warnings)] + +use std::convert::Infallible; +use std::str::FromStr; +use std::time::Duration; +use warp::Filter; + +#[tokio::main] +async fn main() { + // Match `/:Seconds`... + let routes = warp::path::param() + // and_then create a `Future` that will simply wait N seconds... + .and_then(sleepy); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +async fn sleepy(Seconds(seconds): Seconds) -> Result<impl warp::Reply, Infallible> { + tokio::time::sleep(Duration::from_secs(seconds)).await; + Ok(format!("I waited {} seconds!", seconds)) +} + +/// A newtype to enforce our maximum allowed seconds. +struct Seconds(u64); + +impl FromStr for Seconds { + type Err = (); + fn from_str(src: &str) -> Result<Self, Self::Err> { + src.parse::<u64>().map_err(|_| ()).and_then(|num| { + if num <= 5 { + Ok(Seconds(num)) + } else { + Err(()) + } + }) + } +} diff --git a/third_party/rust/warp/examples/handlebars_template.rs b/third_party/rust/warp/examples/handlebars_template.rs new file mode 100644 index 0000000000..78e040539e --- /dev/null +++ b/third_party/rust/warp/examples/handlebars_template.rs @@ -0,0 +1,58 @@ +#![deny(warnings)] +use std::sync::Arc; + +use handlebars::Handlebars; +use serde::Serialize; +use serde_json::json; +use warp::Filter; + +struct WithTemplate<T: Serialize> { + name: &'static str, + value: T, +} + +fn render<T>(template: WithTemplate<T>, hbs: Arc<Handlebars<'_>>) -> impl warp::Reply +where + T: Serialize, +{ + let render = hbs + .render(template.name, &template.value) + .unwrap_or_else(|err| err.to_string()); + warp::reply::html(render) +} + +#[tokio::main] +async fn main() { + let template = "<!DOCTYPE html> + <html> + <head> + <title>Warp Handlebars template example</title> + </head> + <body> + <h1>Hello {{user}}!</h1> + </body> + </html>"; + + let mut hb = Handlebars::new(); + // register the template + hb.register_template_string("template.html", template) + .unwrap(); + + // Turn Handlebars instance into a Filter so we can combine it + // easily with others... + let hb = Arc::new(hb); + + // Create a reusable closure to render template + let handlebars = move |with_template| render(with_template, hb.clone()); + + //GET / + let route = warp::get() + .and(warp::path::end()) + .map(|| WithTemplate { + name: "template.html", + value: json!({"user" : "Warp"}), + }) + .map(handlebars); + + warp::serve(route).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/headers.rs b/third_party/rust/warp/examples/headers.rs new file mode 100644 index 0000000000..2b3dca7b50 --- /dev/null +++ b/third_party/rust/warp/examples/headers.rs @@ -0,0 +1,27 @@ +#![deny(warnings)] +use std::net::SocketAddr; +use warp::Filter; + +/// Create a server that requires header conditions: +/// +/// - `Host` is a `SocketAddr` +/// - `Accept` is exactly `*/*` +/// +/// If these conditions don't match, a 404 is returned. +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // For this example, we assume no DNS was used, + // so the Host header should be an address. + let host = warp::header::<SocketAddr>("host"); + + // Match when we get `accept: */*` exactly. + let accept_stars = warp::header::exact("accept", "*/*"); + + let routes = host + .and(accept_stars) + .map(|addr| format!("accepting stars on {}", addr)); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/hello.rs b/third_party/rust/warp/examples/hello.rs new file mode 100644 index 0000000000..27aa2e51c8 --- /dev/null +++ b/third_party/rust/warp/examples/hello.rs @@ -0,0 +1,10 @@ +#![deny(warnings)] +use warp::Filter; + +#[tokio::main] +async fn main() { + // Match any request and return hello world! + let routes = warp::any().map(|| "Hello, World!"); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/query_string.rs b/third_party/rust/warp/examples/query_string.rs new file mode 100644 index 0000000000..869468eb0d --- /dev/null +++ b/third_party/rust/warp/examples/query_string.rs @@ -0,0 +1,59 @@ +use serde_derive::{Deserialize, Serialize}; +use std::collections::HashMap; +use warp::{ + http::{Response, StatusCode}, + Filter, +}; + +#[derive(Deserialize, Serialize)] +struct MyObject { + key1: String, + key2: u32, +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // get /example1?key=value + // demonstrates an optional parameter. + let example1 = warp::get() + .and(warp::path("example1")) + .and(warp::query::<HashMap<String, String>>()) + .map(|p: HashMap<String, String>| match p.get("key") { + Some(key) => Response::builder().body(format!("key = {}", key)), + None => Response::builder().body(String::from("No \"key\" param in query.")), + }); + + // get /example2?key1=value&key2=42 + // uses the query string to populate a custom object + let example2 = warp::get() + .and(warp::path("example2")) + .and(warp::query::<MyObject>()) + .map(|p: MyObject| { + Response::builder().body(format!("key1 = {}, key2 = {}", p.key1, p.key2)) + }); + + let opt_query = warp::query::<MyObject>() + .map(Some) + .or_else(|_| async { Ok::<(Option<MyObject>,), std::convert::Infallible>((None,)) }); + + // get /example3?key1=value&key2=42 + // builds on example2 but adds custom error handling + let example3 = + warp::get() + .and(warp::path("example3")) + .and(opt_query) + .map(|p: Option<MyObject>| match p { + Some(obj) => { + Response::builder().body(format!("key1 = {}, key2 = {}", obj.key1, obj.key2)) + } + None => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(String::from("Failed to decode query param.")), + }); + + warp::serve(example1.or(example2).or(example3)) + .run(([127, 0, 0, 1], 3030)) + .await +} diff --git a/third_party/rust/warp/examples/rejections.rs b/third_party/rust/warp/examples/rejections.rs new file mode 100644 index 0000000000..721e69ecaa --- /dev/null +++ b/third_party/rust/warp/examples/rejections.rs @@ -0,0 +1,122 @@ +#![deny(warnings)] + +use std::convert::Infallible; +use std::error::Error; +use std::num::NonZeroU16; + +use serde_derive::{Deserialize, Serialize}; +use warp::http::StatusCode; +use warp::{reject, Filter, Rejection, Reply}; + +/// Rejections represent cases where a filter should not continue processing +/// the request, but a different filter *could* process it. +#[tokio::main] +async fn main() { + let math = warp::path!("math" / u16); + let div_with_header = math + .and(warp::get()) + .and(div_by()) + .map(|num: u16, denom: NonZeroU16| { + warp::reply::json(&Math { + op: format!("{} / {}", num, denom), + output: num / denom.get(), + }) + }); + + let div_with_body = + math.and(warp::post()) + .and(warp::body::json()) + .map(|num: u16, body: DenomRequest| { + warp::reply::json(&Math { + op: format!("{} / {}", num, body.denom), + output: num / body.denom.get(), + }) + }); + + let routes = div_with_header.or(div_with_body).recover(handle_rejection); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +/// Extract a denominator from a "div-by" header, or reject with DivideByZero. +fn div_by() -> impl Filter<Extract = (NonZeroU16,), Error = Rejection> + Copy { + warp::header::<u16>("div-by").and_then(|n: u16| async move { + if let Some(denom) = NonZeroU16::new(n) { + Ok(denom) + } else { + Err(reject::custom(DivideByZero)) + } + }) +} + +#[derive(Deserialize)] +struct DenomRequest { + pub denom: NonZeroU16, +} + +#[derive(Debug)] +struct DivideByZero; + +impl reject::Reject for DivideByZero {} + +// JSON replies + +/// A successful math operation. +#[derive(Serialize)] +struct Math { + op: String, + output: u16, +} + +/// An API error serializable to JSON. +#[derive(Serialize)] +struct ErrorMessage { + code: u16, + message: String, +} + +// This function receives a `Rejection` and tries to return a custom +// value, otherwise simply passes the rejection along. +async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> { + let code; + let message; + + if err.is_not_found() { + code = StatusCode::NOT_FOUND; + message = "NOT_FOUND"; + } else if let Some(DivideByZero) = err.find() { + code = StatusCode::BAD_REQUEST; + message = "DIVIDE_BY_ZERO"; + } else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() { + // This error happens if the body could not be deserialized correctly + // We can use the cause to analyze the error and customize the error message + message = match e.source() { + Some(cause) => { + if cause.to_string().contains("denom") { + "FIELD_ERROR: denom" + } else { + "BAD_REQUEST" + } + } + None => "BAD_REQUEST", + }; + code = StatusCode::BAD_REQUEST; + } else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() { + // We can handle a specific error, here METHOD_NOT_ALLOWED, + // and render it however we want + code = StatusCode::METHOD_NOT_ALLOWED; + message = "METHOD_NOT_ALLOWED"; + } else { + // We should have expected this... Just log and say its a 500 + eprintln!("unhandled rejection: {:?}", err); + code = StatusCode::INTERNAL_SERVER_ERROR; + message = "UNHANDLED_REJECTION"; + } + + let json = warp::reply::json(&ErrorMessage { + code: code.as_u16(), + message: message.into(), + }); + + Ok(warp::reply::with_status(json, code)) +} diff --git a/third_party/rust/warp/examples/returning.rs b/third_party/rust/warp/examples/returning.rs new file mode 100644 index 0000000000..f4f61e60fc --- /dev/null +++ b/third_party/rust/warp/examples/returning.rs @@ -0,0 +1,20 @@ +use warp::{filters::BoxedFilter, Filter, Rejection, Reply}; + +// Option 1: BoxedFilter +// Note that this may be useful for shortening compile times when you are composing many filters. +// Boxing the filters will use dynamic dispatch and speed up compilation while +// making it slightly slower at runtime. +pub fn assets_filter() -> BoxedFilter<(impl Reply,)> { + warp::path("assets").and(warp::fs::dir("./assets")).boxed() +} + +// Option 2: impl Filter + Clone +pub fn index_filter() -> impl Filter<Extract = (&'static str,), Error = Rejection> + Clone { + warp::path::end().map(|| "Index page") +} + +#[tokio::main] +async fn main() { + let routes = index_filter().or(assets_filter()); + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/routing.rs b/third_party/rust/warp/examples/routing.rs new file mode 100644 index 0000000000..b2ad8c278d --- /dev/null +++ b/third_party/rust/warp/examples/routing.rs @@ -0,0 +1,104 @@ +#![deny(warnings)] + +use warp::Filter; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // We'll start simple, and gradually show how you combine these powers + // into super powers! + + // GET / + let hello_world = warp::path::end().map(|| "Hello, World at root!"); + + // GET /hi + let hi = warp::path("hi").map(|| "Hello, World!"); + + // How about multiple segments? First, we could use the `path!` macro: + // + // GET /hello/from/warp + let hello_from_warp = warp::path!("hello" / "from" / "warp").map(|| "Hello from warp!"); + + // Fine, but how do I handle parameters in paths? + // + // GET /sum/:u32/:u32 + let sum = warp::path!("sum" / u32 / u32).map(|a, b| format!("{} + {} = {}", a, b, a + b)); + + // Any type that implements FromStr can be used, and in any order: + // + // GET /:u16/times/:u16 + let times = + warp::path!(u16 / "times" / u16).map(|a, b| format!("{} times {} = {}", a, b, a * b)); + + // Oh shoot, those math routes should be mounted at a different path, + // is that possible? Yep. + // + // GET /math/sum/:u32/:u32 + // GET /math/:u16/times/:u16 + let math = warp::path("math"); + let _sum = math.and(sum); + let _times = math.and(times); + + // What! And? What's that do? + // + // It combines the filters in a sort of "this and then that" order. In + // fact, it's exactly what the `path!` macro has been doing internally. + // + // GET /bye/:string + let bye = warp::path("bye") + .and(warp::path::param()) + .map(|name: String| format!("Good bye, {}!", name)); + + // Ah, can filters do things besides `and`? + // + // Why, yes they can! They can also `or`! As you might expect, `or` creates + // a "this or else that" chain of filters. If the first doesn't succeed, + // then it tries the other. + // + // So, those `math` routes could have been mounted all as one, with `or`. + // + // GET /math/sum/:u32/:u32 + // GET /math/:u16/times/:u16 + let math = warp::path("math").and(sum.or(times)); + + // We can use the end() filter to match a shorter path + let help = warp::path("math") + // Careful! Omitting the following line would make this filter match + // requests to /math/sum/:u32/:u32 and /math/:u16/times/:u16 + .and(warp::path::end()) + .map(|| "This is the Math API. Try calling /math/sum/:u32/:u32 or /math/:u16/times/:u16"); + let math = help.or(math); + + // Let's let people know that the `sum` and `times` routes are under `math`. + let sum = sum.map(|output| format!("(This route has moved to /math/sum/:u16/:u16) {}", output)); + let times = + times.map(|output| format!("(This route has moved to /math/:u16/times/:u16) {}", output)); + + // It turns out, using `or` is how you combine everything together into + // a single API. (We also actually haven't been enforcing that the + // method is GET, so we'll do that too!) + // + // GET / + // GET /hi + // GET /hello/from/warp + // GET /bye/:string + // GET /math/sum/:u32/:u32 + // GET /math/:u16/times/:u16 + + let routes = warp::get().and( + hello_world + .or(hi) + .or(hello_from_warp) + .or(bye) + .or(math) + .or(sum) + .or(times), + ); + + // Note that composing filters for many routes may increase compile times (because it uses a lot of generics). + // If you wish to use dynamic dispatch instead and speed up compile times while + // making it slightly slower at runtime, you can use Filter::boxed(). + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/sse.rs b/third_party/rust/warp/examples/sse.rs new file mode 100644 index 0000000000..bce1fb6b1a --- /dev/null +++ b/third_party/rust/warp/examples/sse.rs @@ -0,0 +1,31 @@ +use futures_util::StreamExt; +use std::convert::Infallible; +use std::time::Duration; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use warp::{sse::Event, Filter}; + +// create server-sent event +fn sse_counter(counter: u64) -> Result<Event, Infallible> { + Ok(warp::sse::Event::default().data(counter.to_string())) +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let routes = warp::path("ticks").and(warp::get()).map(|| { + let mut counter: u64 = 0; + // create server event source + let interval = interval(Duration::from_secs(1)); + let stream = IntervalStream::new(interval); + let event_stream = stream.map(move |_| { + counter += 1; + sse_counter(counter) + }); + // reply using server-sent events + warp::sse::reply(event_stream) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/sse_chat.rs b/third_party/rust/warp/examples/sse_chat.rs new file mode 100644 index 0000000000..6e064b1824 --- /dev/null +++ b/third_party/rust/warp/examples/sse_chat.rs @@ -0,0 +1,163 @@ +use futures_util::{Stream, StreamExt}; +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use warp::{sse::Event, Filter}; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // Keep track of all connected users, key is usize, value + // is an event stream sender. + let users = Arc::new(Mutex::new(HashMap::new())); + // Turn our "state" into a new Filter... + let users = warp::any().map(move || users.clone()); + + // POST /chat -> send message + let chat_send = warp::path("chat") + .and(warp::post()) + .and(warp::path::param::<usize>()) + .and(warp::body::content_length_limit(500)) + .and( + warp::body::bytes().and_then(|body: bytes::Bytes| async move { + std::str::from_utf8(&body) + .map(String::from) + .map_err(|_e| warp::reject::custom(NotUtf8)) + }), + ) + .and(users.clone()) + .map(|my_id, msg, users| { + user_message(my_id, msg, &users); + warp::reply() + }); + + // GET /chat -> messages stream + let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| { + // reply using server-sent events + let stream = user_connected(users); + warp::sse::reply(warp::sse::keep_alive().stream(stream)) + }); + + // GET / -> index html + let index = warp::path::end().map(|| { + warp::http::Response::builder() + .header("content-type", "text/html; charset=utf-8") + .body(INDEX_HTML) + }); + + let routes = index.or(chat_recv).or(chat_send); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +/// Our global unique user id counter. +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); + +/// Message variants. +#[derive(Debug)] +enum Message { + UserId(usize), + Reply(String), +} + +#[derive(Debug)] +struct NotUtf8; +impl warp::reject::Reject for NotUtf8 {} + +/// Our state of currently connected users. +/// +/// - Key is their id +/// - Value is a sender of `Message` +type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>; + +fn user_connected(users: Users) -> impl Stream<Item = Result<Event, warp::Error>> + Send + 'static { + // Use a counter to assign a new unique ID for this user. + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("new chat user: {}", my_id); + + // Use an unbounded channel to handle buffering and flushing of messages + // to the event source... + let (tx, rx) = mpsc::unbounded_channel(); + let rx = UnboundedReceiverStream::new(rx); + + tx.send(Message::UserId(my_id)) + // rx is right above, so this cannot fail + .unwrap(); + + // Save the sender in our list of connected users. + users.lock().unwrap().insert(my_id, tx); + + // Convert messages into Server-Sent Events and return resulting stream. + rx.map(|msg| match msg { + Message::UserId(my_id) => Ok(Event::default().event("user").data(my_id.to_string())), + Message::Reply(reply) => Ok(Event::default().data(reply)), + }) +} + +fn user_message(my_id: usize, msg: String, users: &Users) { + let new_msg = format!("<User#{}>: {}", my_id, msg); + + // New message from this user, send it to everyone else (except same uid)... + // + // We use `retain` instead of a for loop so that we can reap any user that + // appears to have disconnected. + users.lock().unwrap().retain(|uid, tx| { + if my_id == *uid { + // don't send to same user, but do retain + true + } else { + // If not `is_ok`, the SSE stream is gone, and so don't retain + tx.send(Message::Reply(new_msg.clone())).is_ok() + } + }); +} + +static INDEX_HTML: &str = r#" +<!DOCTYPE html> +<html> + <head> + <title>Warp Chat</title> + </head> + <body> + <h1>warp chat</h1> + <div id="chat"> + <p><em>Connecting...</em></p> + </div> + <input type="text" id="text" /> + <button type="button" id="send">Send</button> + <script type="text/javascript"> + var uri = 'http://' + location.host + '/chat'; + var sse = new EventSource(uri); + function message(data) { + var line = document.createElement('p'); + line.innerText = data; + chat.appendChild(line); + } + sse.onopen = function() { + chat.innerHTML = "<p><em>Connected!</em></p>"; + } + var user_id; + sse.addEventListener("user", function(msg) { + user_id = msg.data; + }); + sse.onmessage = function(msg) { + message(msg.data); + }; + send.onclick = function() { + var msg = text.value; + var xhr = new XMLHttpRequest(); + xhr.open("POST", uri + '/' + user_id, true); + xhr.send(msg); + text.value = ''; + message('<You>: ' + msg); + }; + </script> + </body> +</html> +"#; diff --git a/third_party/rust/warp/examples/tls.rs b/third_party/rust/warp/examples/tls.rs new file mode 100644 index 0000000000..7d28e03a3a --- /dev/null +++ b/third_party/rust/warp/examples/tls.rs @@ -0,0 +1,25 @@ +#![deny(warnings)] + +// Don't copy this `cfg`, it's only needed because this file is within +// the warp repository. +// Instead, specify the "tls" feature in your warp dependency declaration. +#[cfg(feature = "tls")] +#[tokio::main] +async fn main() { + use warp::Filter; + + // Match any request and return hello world! + let routes = warp::any().map(|| "Hello, World!"); + + warp::serve(routes) + .tls() + .cert_path("examples/tls/cert.pem") + .key_path("examples/tls/key.rsa") + .run(([127, 0, 0, 1], 3030)) + .await; +} + +#[cfg(not(feature = "tls"))] +fn main() { + eprintln!("Requires the `tls` feature."); +} diff --git a/third_party/rust/warp/examples/tls/cert.pem b/third_party/rust/warp/examples/tls/cert.pem new file mode 100644 index 0000000000..03af12ff81 --- /dev/null +++ b/third_party/rust/warp/examples/tls/cert.pem @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIIEADCCAmigAwIBAgICAcgwDQYJKoZIhvcNAQELBQAwLDEqMCgGA1UEAwwhcG9u +eXRvd24gUlNBIGxldmVsIDIgaW50ZXJtZWRpYXRlMB4XDTE2MDgxMzE2MDcwNFoX +DTIyMDIwMzE2MDcwNFowGTEXMBUGA1UEAwwOdGVzdHNlcnZlci5jb20wggEiMA0G +CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCpVhh1/FNP2qvWenbZSghari/UThwe +dynfnHG7gc3JmygkEdErWBO/CHzHgsx7biVE5b8sZYNEDKFojyoPHGWK2bQM/FTy +niJCgNCLdn6hUqqxLAml3cxGW77hAWu94THDGB1qFe+eFiAUnDmob8gNZtAzT6Ky +b/JGJdrEU0wj+Rd7wUb4kpLInNH/Jc+oz2ii2AjNbGOZXnRz7h7Kv3sO9vABByYe +LcCj3qnhejHMqVhbAT1MD6zQ2+YKBjE52MsQKU/xhUpu9KkUyLh0cxkh3zrFiKh4 +Vuvtc+n7aeOv2jJmOl1dr0XLlSHBlmoKqH6dCTSbddQLmlK7dms8vE01AgMBAAGj +gb4wgbswDAYDVR0TAQH/BAIwADALBgNVHQ8EBAMCBsAwHQYDVR0OBBYEFMeUzGYV +bXwJNQVbY1+A8YXYZY8pMEIGA1UdIwQ7MDmAFJvEsUi7+D8vp8xcWvnEdVBGkpoW +oR6kHDAaMRgwFgYDVQQDDA9wb255dG93biBSU0EgQ0GCAXswOwYDVR0RBDQwMoIO +dGVzdHNlcnZlci5jb22CFXNlY29uZC50ZXN0c2VydmVyLmNvbYIJbG9jYWxob3N0 +MA0GCSqGSIb3DQEBCwUAA4IBgQBsk5ivAaRAcNgjc7LEiWXFkMg703AqDDNx7kB1 +RDgLalLvrjOfOp2jsDfST7N1tKLBSQ9bMw9X4Jve+j7XXRUthcwuoYTeeo+Cy0/T +1Q78ctoX74E2nB958zwmtRykGrgE/6JAJDwGcgpY9kBPycGxTlCN926uGxHsDwVs +98cL6ZXptMLTR6T2XP36dAJZuOICSqmCSbFR8knc/gjUO36rXTxhwci8iDbmEVaf +BHpgBXGU5+SQ+QM++v6bHGf4LNQC5NZ4e4xvGax8ioYu/BRsB/T3Lx+RlItz4zdU +XuxCNcm3nhQV2ZHquRdbSdoyIxV5kJXel4wCmOhWIq7A2OBKdu5fQzIAzzLi65EN +RPAKsKB4h7hGgvciZQ7dsMrlGw0DLdJ6UrFyiR5Io7dXYT/+JP91lP5xsl6Lhg9O +FgALt7GSYRm2cZdgi9pO9rRr83Br1VjQT1vHz6yoZMXSqc4A2zcN2a2ZVq//rHvc +FZygs8miAhWPzqnpmgTj1cPiU1M= +-----END CERTIFICATE----- diff --git a/third_party/rust/warp/examples/tls/key.rsa b/third_party/rust/warp/examples/tls/key.rsa new file mode 100644 index 0000000000..b13bf5d07f --- /dev/null +++ b/third_party/rust/warp/examples/tls/key.rsa @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAqVYYdfxTT9qr1np22UoIWq4v1E4cHncp35xxu4HNyZsoJBHR +K1gTvwh8x4LMe24lROW/LGWDRAyhaI8qDxxlitm0DPxU8p4iQoDQi3Z+oVKqsSwJ +pd3MRlu+4QFrveExwxgdahXvnhYgFJw5qG/IDWbQM0+ism/yRiXaxFNMI/kXe8FG ++JKSyJzR/yXPqM9ootgIzWxjmV50c+4eyr97DvbwAQcmHi3Ao96p4XoxzKlYWwE9 +TA+s0NvmCgYxOdjLEClP8YVKbvSpFMi4dHMZId86xYioeFbr7XPp+2njr9oyZjpd +Xa9Fy5UhwZZqCqh+nQk0m3XUC5pSu3ZrPLxNNQIDAQABAoIBAFKtZJgGsK6md4vq +kyiYSufrcBLaaEQ/rkQtYCJKyC0NAlZKFLRy9oEpJbNLm4cQSkYPXn3Qunx5Jj2k +2MYz+SgIDy7f7KHgr52Ew020dzNQ52JFvBgt6NTZaqL1TKOS1fcJSSNIvouTBerK +NCSXHzfb4P+MfEVe/w1c4ilE+kH9SzdEo2jK/sRbzHIY8TX0JbmQ4SCLLayr22YG +usIxtIYcWt3MMP/G2luRnYzzBCje5MXdpAhlHLi4TB6x4h5PmBKYc57uOVNngKLd +YyrQKcszW4Nx5v0a4HG3A5EtUXNCco1+5asXOg2lYphQYVh2R+1wgu5WiDjDVu+6 +EYgjFSkCgYEA0NBk6FDoxE/4L/4iJ4zIhu9BptN8Je/uS5c6wRejNC/VqQyw7SHb +hRFNrXPvq5Y+2bI/DxtdzZLKAMXOMjDjj0XEgfOIn2aveOo3uE7zf1i+njxwQhPu +uSYA9AlBZiKGr2PCYSDPnViHOspVJjxRuAgyWM1Qf+CTC0D95aj0oz8CgYEAz5n4 +Cb3/WfUHxMJLljJ7PlVmlQpF5Hk3AOR9+vtqTtdxRjuxW6DH2uAHBDdC3OgppUN4 +CFj55kzc2HUuiHtmPtx8mK6G+otT7Lww+nLSFL4PvZ6CYxqcio5MPnoYd+pCxrXY +JFo2W7e4FkBOxb5PF5So5plg+d0z/QiA7aFP1osCgYEAtgi1rwC5qkm8prn4tFm6 +hkcVCIXc+IWNS0Bu693bXKdGr7RsmIynff1zpf4ntYGpEMaeymClCY0ppDrMYlzU +RBYiFNdlBvDRj6s/H+FTzHRk2DT/99rAhY9nzVY0OQFoQIXK8jlURGrkmI/CYy66 +XqBmo5t4zcHM7kaeEBOWEKkCgYAYnO6VaRtPNQfYwhhoFFAcUc+5t+AVeHGW/4AY +M5qlAlIBu64JaQSI5KqwS0T4H+ZgG6Gti68FKPO+DhaYQ9kZdtam23pRVhd7J8y+ +xMI3h1kiaBqZWVxZ6QkNFzizbui/2mtn0/JB6YQ/zxwHwcpqx0tHG8Qtm5ZAV7PB +eLCYhQKBgQDALJxU/6hMTdytEU5CLOBSMby45YD/RrfQrl2gl/vA0etPrto4RkVq +UrkDO/9W4mZORClN3knxEFSTlYi8YOboxdlynpFfhcs82wFChs+Ydp1eEsVHAqtu +T+uzn0sroycBiBfVB949LExnzGDFUkhG0i2c2InarQYLTsIyHCIDEA== +-----END RSA PRIVATE KEY----- diff --git a/third_party/rust/warp/examples/todos.rs b/third_party/rust/warp/examples/todos.rs new file mode 100644 index 0000000000..904d604e8f --- /dev/null +++ b/third_party/rust/warp/examples/todos.rs @@ -0,0 +1,291 @@ +#![deny(warnings)] + +use std::env; +use warp::Filter; + +/// Provides a RESTful web server managing some Todos. +/// +/// API will be: +/// +/// - `GET /todos`: return a JSON list of Todos. +/// - `POST /todos`: create a new Todo. +/// - `PUT /todos/:id`: update a specific Todo. +/// - `DELETE /todos/:id`: delete a specific Todo. +#[tokio::main] +async fn main() { + if env::var_os("RUST_LOG").is_none() { + // Set `RUST_LOG=todos=debug` to see debug logs, + // this only shows access logs. + env::set_var("RUST_LOG", "todos=info"); + } + pretty_env_logger::init(); + + let db = models::blank_db(); + + let api = filters::todos(db); + + // View access logs by setting `RUST_LOG=todos`. + let routes = api.with(warp::log("todos")); + // Start up the server... + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +mod filters { + use super::handlers; + use super::models::{Db, ListOptions, Todo}; + use warp::Filter; + + /// The 4 TODOs filters combined. + pub fn todos( + db: Db, + ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { + todos_list(db.clone()) + .or(todos_create(db.clone())) + .or(todos_update(db.clone())) + .or(todos_delete(db)) + } + + /// GET /todos?offset=3&limit=5 + pub fn todos_list( + db: Db, + ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { + warp::path!("todos") + .and(warp::get()) + .and(warp::query::<ListOptions>()) + .and(with_db(db)) + .and_then(handlers::list_todos) + } + + /// POST /todos with JSON body + pub fn todos_create( + db: Db, + ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { + warp::path!("todos") + .and(warp::post()) + .and(json_body()) + .and(with_db(db)) + .and_then(handlers::create_todo) + } + + /// PUT /todos/:id with JSON body + pub fn todos_update( + db: Db, + ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { + warp::path!("todos" / u64) + .and(warp::put()) + .and(json_body()) + .and(with_db(db)) + .and_then(handlers::update_todo) + } + + /// DELETE /todos/:id + pub fn todos_delete( + db: Db, + ) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone { + // We'll make one of our endpoints admin-only to show how authentication filters are used + let admin_only = warp::header::exact("authorization", "Bearer admin"); + + warp::path!("todos" / u64) + // It is important to put the auth check _after_ the path filters. + // If we put the auth check before, the request `PUT /todos/invalid-string` + // would try this filter and reject because the authorization header doesn't match, + // rather because the param is wrong for that other path. + .and(admin_only) + .and(warp::delete()) + .and(with_db(db)) + .and_then(handlers::delete_todo) + } + + fn with_db(db: Db) -> impl Filter<Extract = (Db,), Error = std::convert::Infallible> + Clone { + warp::any().map(move || db.clone()) + } + + fn json_body() -> impl Filter<Extract = (Todo,), Error = warp::Rejection> + Clone { + // When accepting a body, we want a JSON body + // (and to reject huge payloads)... + warp::body::content_length_limit(1024 * 16).and(warp::body::json()) + } +} + +/// These are our API handlers, the ends of each filter chain. +/// Notice how thanks to using `Filter::and`, we can define a function +/// with the exact arguments we'd expect from each filter in the chain. +/// No tuples are needed, it's auto flattened for the functions. +mod handlers { + use super::models::{Db, ListOptions, Todo}; + use std::convert::Infallible; + use warp::http::StatusCode; + + pub async fn list_todos(opts: ListOptions, db: Db) -> Result<impl warp::Reply, Infallible> { + // Just return a JSON array of todos, applying the limit and offset. + let todos = db.lock().await; + let todos: Vec<Todo> = todos + .clone() + .into_iter() + .skip(opts.offset.unwrap_or(0)) + .take(opts.limit.unwrap_or(std::usize::MAX)) + .collect(); + Ok(warp::reply::json(&todos)) + } + + pub async fn create_todo(create: Todo, db: Db) -> Result<impl warp::Reply, Infallible> { + log::debug!("create_todo: {:?}", create); + + let mut vec = db.lock().await; + + for todo in vec.iter() { + if todo.id == create.id { + log::debug!(" -> id already exists: {}", create.id); + // Todo with id already exists, return `400 BadRequest`. + return Ok(StatusCode::BAD_REQUEST); + } + } + + // No existing Todo with id, so insert and return `201 Created`. + vec.push(create); + + Ok(StatusCode::CREATED) + } + + pub async fn update_todo( + id: u64, + update: Todo, + db: Db, + ) -> Result<impl warp::Reply, Infallible> { + log::debug!("update_todo: id={}, todo={:?}", id, update); + let mut vec = db.lock().await; + + // Look for the specified Todo... + for todo in vec.iter_mut() { + if todo.id == id { + *todo = update; + return Ok(StatusCode::OK); + } + } + + log::debug!(" -> todo id not found!"); + + // If the for loop didn't return OK, then the ID doesn't exist... + Ok(StatusCode::NOT_FOUND) + } + + pub async fn delete_todo(id: u64, db: Db) -> Result<impl warp::Reply, Infallible> { + log::debug!("delete_todo: id={}", id); + + let mut vec = db.lock().await; + + let len = vec.len(); + vec.retain(|todo| { + // Retain all Todos that aren't this id... + // In other words, remove all that *are* this id... + todo.id != id + }); + + // If the vec is smaller, we found and deleted a Todo! + let deleted = vec.len() != len; + + if deleted { + // respond with a `204 No Content`, which means successful, + // yet no body expected... + Ok(StatusCode::NO_CONTENT) + } else { + log::debug!(" -> todo id not found!"); + Ok(StatusCode::NOT_FOUND) + } + } +} + +mod models { + use serde_derive::{Deserialize, Serialize}; + use std::sync::Arc; + use tokio::sync::Mutex; + + /// So we don't have to tackle how different database work, we'll just use + /// a simple in-memory DB, a vector synchronized by a mutex. + pub type Db = Arc<Mutex<Vec<Todo>>>; + + pub fn blank_db() -> Db { + Arc::new(Mutex::new(Vec::new())) + } + + #[derive(Debug, Deserialize, Serialize, Clone)] + pub struct Todo { + pub id: u64, + pub text: String, + pub completed: bool, + } + + // The query parameters for list_todos. + #[derive(Debug, Deserialize)] + pub struct ListOptions { + pub offset: Option<usize>, + pub limit: Option<usize>, + } +} + +#[cfg(test)] +mod tests { + use warp::http::StatusCode; + use warp::test::request; + + use super::{ + filters, + models::{self, Todo}, + }; + + #[tokio::test] + async fn test_post() { + let db = models::blank_db(); + let api = filters::todos(db); + + let resp = request() + .method("POST") + .path("/todos") + .json(&todo1()) + .reply(&api) + .await; + + assert_eq!(resp.status(), StatusCode::CREATED); + } + + #[tokio::test] + async fn test_post_conflict() { + let db = models::blank_db(); + db.lock().await.push(todo1()); + let api = filters::todos(db); + + let resp = request() + .method("POST") + .path("/todos") + .json(&todo1()) + .reply(&api) + .await; + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn test_put_unknown() { + let _ = pretty_env_logger::try_init(); + let db = models::blank_db(); + let api = filters::todos(db); + + let resp = request() + .method("PUT") + .path("/todos/1") + .header("authorization", "Bearer admin") + .json(&todo1()) + .reply(&api) + .await; + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + fn todo1() -> Todo { + Todo { + id: 1, + text: "test 1".into(), + completed: false, + } + } +} diff --git a/third_party/rust/warp/examples/tracing.rs b/third_party/rust/warp/examples/tracing.rs new file mode 100644 index 0000000000..103f747a93 --- /dev/null +++ b/third_party/rust/warp/examples/tracing.rs @@ -0,0 +1,59 @@ +//! [`tracing`] is a framework for instrumenting Rust programs to +//! collect scoped, structured, and async-aware diagnostics. This example +//! demonstrates how the `warp::trace` module can be used to instrument `warp` +//! applications with `tracing`. +//! +//! [`tracing`]: https://crates.io/crates/tracing +#![deny(warnings)] +use tracing_subscriber::fmt::format::FmtSpan; +use warp::Filter; + +#[tokio::main] +async fn main() { + // Filter traces based on the RUST_LOG env var, or, if it's not set, + // default to show the output of the example. + let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| "tracing=info,warp=debug".to_owned()); + + // Configure the default `tracing` subscriber. + // The `fmt` subscriber from the `tracing-subscriber` crate logs `tracing` + // events to stdout. Other subscribers are available for integrating with + // distributed tracing systems such as OpenTelemetry. + tracing_subscriber::fmt() + // Use the filter we built above to determine which traces to record. + .with_env_filter(filter) + // Record an event when each span closes. This can be used to time our + // routes' durations! + .with_span_events(FmtSpan::CLOSE) + .init(); + + let hello = warp::path("hello") + .and(warp::get()) + // When the `hello` route is called, emit a `tracing` event. + .map(|| { + tracing::info!("saying hello..."); + "Hello, World!" + }) + // Wrap the route in a `tracing` span to add the route's name as context + // to any events that occur inside it. + .with(warp::trace::named("hello")); + + let goodbye = warp::path("goodbye") + .and(warp::get()) + .map(|| { + tracing::info!("saying goodbye..."); + "So long and thanks for all the fish!" + }) + // We can also provide our own custom `tracing` spans to wrap a route. + .with(warp::trace(|info| { + // Construct our own custom span for this route. + tracing::info_span!("goodbye", req.path = ?info.path()) + })); + + let routes = hello + .or(goodbye) + // Wrap all the routes with a filter that creates a `tracing` span for + // each request we receive, including data about the request. + .with(warp::trace::request()); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/unix_socket.rs b/third_party/rust/warp/examples/unix_socket.rs new file mode 100644 index 0000000000..521aeead21 --- /dev/null +++ b/third_party/rust/warp/examples/unix_socket.rs @@ -0,0 +1,22 @@ +#![deny(warnings)] + +#[cfg(unix)] +#[tokio::main] +async fn main() { + use tokio::net::UnixListener; + use tokio_stream::wrappers::UnixListenerStream; + + pretty_env_logger::init(); + + let listener = UnixListener::bind("/tmp/warp.sock").unwrap(); + let incoming = UnixListenerStream::new(listener); + warp::serve(warp::fs::dir("examples/dir")) + .run_incoming(incoming) + .await; +} + +#[cfg(not(unix))] +#[tokio::main] +async fn main() { + panic!("Must run under Unix-like platform!"); +} diff --git a/third_party/rust/warp/examples/websockets.rs b/third_party/rust/warp/examples/websockets.rs new file mode 100644 index 0000000000..b0de205743 --- /dev/null +++ b/third_party/rust/warp/examples/websockets.rs @@ -0,0 +1,27 @@ +#![deny(warnings)] + +use futures_util::{FutureExt, StreamExt}; +use warp::Filter; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let routes = warp::path("echo") + // The `ws()` filter will prepare the Websocket handshake. + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { + // And then our closure will be called when it completes... + ws.on_upgrade(|websocket| { + // Just echo all messages back... + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + eprintln!("websocket error: {:?}", e); + } + }) + }) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/third_party/rust/warp/examples/websockets_chat.rs b/third_party/rust/warp/examples/websockets_chat.rs new file mode 100644 index 0000000000..21e2286f6f --- /dev/null +++ b/third_party/rust/warp/examples/websockets_chat.rs @@ -0,0 +1,175 @@ +// #![deny(warnings)] +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use futures_util::{SinkExt, StreamExt, TryFutureExt}; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use warp::ws::{Message, WebSocket}; +use warp::Filter; + +/// Our global unique user id counter. +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); + +/// Our state of currently connected users. +/// +/// - Key is their id +/// - Value is a sender of `warp::ws::Message` +type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // Keep track of all connected users, key is usize, value + // is a websocket sender. + let users = Users::default(); + // Turn our "state" into a new Filter... + let users = warp::any().map(move || users.clone()); + + // GET /chat -> websocket upgrade + let chat = warp::path("chat") + // The `ws()` filter will prepare Websocket handshake... + .and(warp::ws()) + .and(users) + .map(|ws: warp::ws::Ws, users| { + // This will call our function if the handshake succeeds. + ws.on_upgrade(move |socket| user_connected(socket, users)) + }); + + // GET / -> index html + let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML)); + + let routes = index.or(chat); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +async fn user_connected(ws: WebSocket, users: Users) { + // Use a counter to assign a new unique ID for this user. + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("new chat user: {}", my_id); + + // Split the socket into a sender and receive of messages. + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + + // Use an unbounded channel to handle buffering and flushing of messages + // to the websocket... + let (tx, rx) = mpsc::unbounded_channel(); + let mut rx = UnboundedReceiverStream::new(rx); + + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + user_ws_tx + .send(message) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + + // Save the sender in our list of connected users. + users.write().await.insert(my_id, tx); + + // Return a `Future` that is basically a state machine managing + // this specific user's connection. + + // Every time the user sends a message, broadcast it to + // all other users... + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + eprintln!("websocket error(uid={}): {}", my_id, e); + break; + } + }; + user_message(my_id, msg, &users).await; + } + + // user_ws_rx stream will keep processing as long as the user stays + // connected. Once they disconnect, then... + user_disconnected(my_id, &users).await; +} + +async fn user_message(my_id: usize, msg: Message, users: &Users) { + // Skip any non-Text messages... + let msg = if let Ok(s) = msg.to_str() { + s + } else { + return; + }; + + let new_msg = format!("<User#{}>: {}", my_id, msg); + + // New message from this user, send it to everyone else (except same uid)... + for (&uid, tx) in users.read().await.iter() { + if my_id != uid { + if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { + // The tx is disconnected, our `user_disconnected` code + // should be happening in another task, nothing more to + // do here. + } + } + } +} + +async fn user_disconnected(my_id: usize, users: &Users) { + eprintln!("good bye user: {}", my_id); + + // Stream closed up, so remove from the user list + users.write().await.remove(&my_id); +} + +static INDEX_HTML: &str = r#"<!DOCTYPE html> +<html lang="en"> + <head> + <title>Warp Chat</title> + </head> + <body> + <h1>Warp chat</h1> + <div id="chat"> + <p><em>Connecting...</em></p> + </div> + <input type="text" id="text" /> + <button type="button" id="send">Send</button> + <script type="text/javascript"> + const chat = document.getElementById('chat'); + const text = document.getElementById('text'); + const uri = 'ws://' + location.host + '/chat'; + const ws = new WebSocket(uri); + + function message(data) { + const line = document.createElement('p'); + line.innerText = data; + chat.appendChild(line); + } + + ws.onopen = function() { + chat.innerHTML = '<p><em>Connected!</em></p>'; + }; + + ws.onmessage = function(msg) { + message(msg.data); + }; + + ws.onclose = function() { + chat.getElementsByTagName('em')[0].innerText = 'Disconnected!'; + }; + + send.onclick = function() { + const msg = text.value; + ws.send(msg); + text.value = ''; + + message('<You>: ' + msg); + }; + </script> + </body> +</html> +"#; diff --git a/third_party/rust/warp/examples/wrapping.rs b/third_party/rust/warp/examples/wrapping.rs new file mode 100644 index 0000000000..bfc84f980f --- /dev/null +++ b/third_party/rust/warp/examples/wrapping.rs @@ -0,0 +1,31 @@ +#![deny(warnings)] +use warp::Filter; + +fn hello_wrapper<F, T>( + filter: F, +) -> impl Filter<Extract = (&'static str,)> + Clone + Send + Sync + 'static +where + F: Filter<Extract = (T,), Error = std::convert::Infallible> + Clone + Send + Sync + 'static, + F::Extract: warp::Reply, +{ + warp::any() + .map(|| { + println!("before filter"); + }) + .untuple_one() + .and(filter) + .map(|_arg| "wrapped hello world") +} + +#[tokio::main] +async fn main() { + // Match any request and return hello world! + let routes = warp::any() + .map(|| "hello world") + .boxed() + .recover(|_err| async { Ok("recovered") }) + // wrap the filter with hello_wrapper + .with(warp::wrap_fn(hello_wrapper)); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} |