diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-0.1.22/examples | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.22/examples')
16 files changed, 2508 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/examples/README.md b/third_party/rust/tokio-0.1.22/examples/README.md new file mode 100644 index 0000000000..ac9e9b42ff --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/README.md @@ -0,0 +1,62 @@ +## Examples of how to use Tokio + +This directory contains a number of examples showcasing various capabilities of +the `tokio` crate. + +All examples can be executed with: + +``` +cargo run --example $name +``` + +A high level description of each example is: + +* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to + all connected clients and then terminates the connection, should help see how + to create and initialize `tokio`. + +* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts + connections and then echos back any contents that are read from each connected + client. + +* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP + listener, accept connections in a loop, and put down in the stdout everything + that's read off of each TCP connection. + +* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP + instead of TCP. This will echo back any packets received to the original + sender. + +* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to + interact with most other examples. The program creates a TCP connection or UDP + socket and sends all information read on stdin to the remote peer, displaying + any data received on stdout. Often quite useful when interacting with the + various other servers here! + +* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from + any connected client to all other connected clients. You can connect to this + in multiple terminals and use it to chat between the terminals. + +* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a + much more functional programming approach using combinators. + +* [`proxy`](proxy.rs) - an example proxy server that will forward all connected + TCP clients to the remote address specified when starting the program. + +* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP + request bodies showcasing running on multiple cores, working with futures and + spawning tasks, and finally framing a TCP connection to discrete + request/response objects. + +* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state + between all connected clients, notably the key/value store of this database. + +* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example. + +* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime. + +* [`blocking`](blocking.rs) - perform heavy computation in blocking environment. + +If you've got an example you'd like to see here, please feel free to open an +issue. Otherwise if you've got an example you'd like to add, please feel free +to make a PR! diff --git a/third_party/rust/tokio-0.1.22/examples/blocking.rs b/third_party/rust/tokio-0.1.22/examples/blocking.rs new file mode 100644 index 0000000000..e7d5da6c80 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/blocking.rs @@ -0,0 +1,87 @@ +//! An example of using blocking funcion annotation. +//! +//! This example will create 8 "heavy computation" blocking futures and 8 +//! non-blocking futures with 4 threads core threads in runtime. +//! Each non-blocking future will print it's id and return immideatly. +//! Each blocking future will print it's id on start, sleep for 1000 ms, print +//! it's id and return. +//! +//! Note how non-blocking threads are executed before blocking threads finish +//! their task. + +extern crate tokio; +extern crate tokio_threadpool; + +use std::thread; +use std::time::Duration; +use tokio::prelude::*; +use tokio::runtime::Builder; +use tokio_threadpool::blocking; + +/// This future blocks it's poll method for 1000 ms. +struct BlockingFuture { + value: i32, +} + +impl Future for BlockingFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + println!("Blocking begin: {}!", self.value); + // Try replacing this part with commnted code + blocking(|| { + println!("Blocking part annotated: {}!", self.value); + thread::sleep(Duration::from_millis(1000)); + println!("Blocking done annotated: {}!", self.value); + }) + .map_err(|err| panic!("Error in blocing block: {:?}", err)) + // println!("Blocking part annotated: {}!", self.value); + // thread::sleep(Duration::from_millis(1000)); + // println!("Blocking done annotated: {}!", self.value); + // Ok(Async::Ready(())) + } +} + +/// This future returns immideatly. +struct NonBlockingFuture { + value: i32, +} + +impl Future for NonBlockingFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + println!("Non-blocking done: {}!", self.value); + Ok(Async::Ready(())) + } +} + +/// This future spawns child futures. +struct SpawningFuture; + +impl Future for SpawningFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + for i in 0..8 { + let blocking_future = BlockingFuture { value: i }; + + tokio::spawn(blocking_future); + } + for i in 0..8 { + let non_blocking_future = NonBlockingFuture { value: i }; + tokio::spawn(non_blocking_future); + } + Ok(Async::Ready(())) + } +} + +fn main() { + let spawning_future = SpawningFuture; + + let runtime = Builder::new().core_threads(4).build().unwrap(); + runtime.block_on_all(spawning_future).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs b/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs new file mode 100644 index 0000000000..ee147025d2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs @@ -0,0 +1,172 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a line-based server which accepts connections, reads lines from +//! those connections, and broadcasts the lines to all other connected clients. +//! +//! This example is similar to chat.rs, but uses combinators and a much more +//! functional style. +//! +//! Because we are here running the reactor/executor on the same thread instead +//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use +//! Rc + RefCell instead. The max performance is however limited to a CPU HW +//! thread. +//! +//! You can test this out by running: +//! +//! cargo run --example chat-combinator-current-thread +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate futures; +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio::runtime::current_thread::{Runtime, TaskExecutor}; + +use std::cell::RefCell; +use std::collections::HashMap; +use std::env; +use std::io::BufReader; +use std::iter; +use std::rc::Rc; + +fn main() -> Result<(), Box<std::error::Error>> { + let mut runtime = Runtime::new().unwrap(); + + // Create the TCP listener we'll accept connections on. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse()?; + + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // This is running on the Tokio current_thread runtime, so it will be single- + // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks. + let connections = Rc::new(RefCell::new(HashMap::new())); + + // The server task asynchronously iterates over and processes each incoming + // connection. + let srv = socket + .incoming() + .map_err(|e| { + println!("failed to accept socket; error = {:?}", e); + e + }) + .for_each(move |stream| { + // The client's socket address + let addr = stream.peer_addr()?; + + println!("New Connection: {}", addr); + + // Split the TcpStream into two separate handles. One handle for reading + // and one handle for writing. This lets us use separate tasks for + // reading and writing. + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + let mut conns = connections.borrow_mut(); + conns.insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); + + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); + + // Move the connection state into the closure below. + let connections = connections_inner.clone(); + + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.borrow_mut(); + + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()) + .unwrap(); + } + + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf<TcpStream>`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + + // Spawn locally a task to process the connection + TaskExecutor::current() + .spawn_local(Box::new(connection.then(move |_| { + let mut conns = connections.borrow_mut(); + conns.remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + }))) + .unwrap(); + + Ok(()) + }) + .map_err(|err| println!("error occurred: {:?}", err)); + + // Spawn srv itself + runtime.spawn(srv); + + // Execute server + runtime.run().unwrap(); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs b/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs new file mode 100644 index 0000000000..b81e8f7c35 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs @@ -0,0 +1,156 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a line-based server which accepts connections, reads lines from +//! those connections, and broadcasts the lines to all other connected clients. +//! +//! This example is similar to chat.rs, but uses combinators and a much more +//! functional style. +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate futures; +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +use std::collections::HashMap; +use std::env; +use std::io::BufReader; +use std::iter; +use std::sync::{Arc, Mutex}; + +fn main() -> Result<(), Box<std::error::Error>> { + // Create the TCP listener we'll accept connections on. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse()?; + + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // This is running on the Tokio runtime, so it will be multi-threaded. The + // `Arc<Mutex<...>>` allows state to be shared across the threads. + let connections = Arc::new(Mutex::new(HashMap::new())); + + // The server task asynchronously iterates over and processes each incoming + // connection. + let srv = socket + .incoming() + .map_err(|e| { + println!("failed to accept socket; error = {:?}", e); + e + }) + .for_each(move |stream| { + // The client's socket address + let addr = stream.peer_addr()?; + + println!("New Connection: {}", addr); + + // Split the TcpStream into two separate handles. One handle for reading + // and one handle for writing. This lets us use separate tasks for + // reading and writing. + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + connections.lock().unwrap().insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); + + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); + + // Move the connection state into the closure below. + let connections = connections_inner.clone(); + + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.lock().unwrap(); + + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()) + .unwrap(); + } + + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf<TcpStream>`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + + // Spawn a task to process the connection + tokio::spawn(connection.then(move |_| { + connections.lock().unwrap().remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + })); + + Ok(()) + }) + .map_err(|err| println!("error occurred: {:?}", err)); + + // execute server + tokio::run(srv); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/chat.rs b/third_party/rust/tokio-0.1.22/examples/chat.rs new file mode 100644 index 0000000000..b21432afa2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat.rs @@ -0,0 +1,473 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines sent by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! +//! You can run the `telnet` command in any number of additional windows. +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate tokio; +#[macro_use] +extern crate futures; +extern crate bytes; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::future::{self, Either}; +use futures::sync::mpsc; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender<Bytes>; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver<Bytes>; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap<SocketAddr, Tx>, +} + +/// The state for each connected client. +struct Peer { + /// Name of the peer. + /// + /// When a client connects, the first line sent is treated as the client's + /// name (like alice or bob). The name is used to preface all messages that + /// arrive from the client so that we can simulate a real chat server: + /// + /// ```text + /// alice: Hello everyone. + /// bob: Welcome to telnet chat! + /// ``` + name: BytesMut, + + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Lines, + + /// Handle to the shared chat state. + /// + /// This is used to broadcast messages read off the socket to all connected + /// peers. + state: Arc<Mutex<Shared>>, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, + + /// Client socket address. + /// + /// The socket address is used as the key in the `peers` HashMap. The + /// address is saved so that the `Peer` drop implementation can clean up its + /// entry. + addr: SocketAddr, +} + +/// Line based codec +/// +/// This decorates a socket and presents a line based read / write interface. +/// +/// As a user of `Lines`, we can focus on working at the line level. So, we send +/// and receive values that represent entire lines. The `Lines` codec will +/// handle the encoding and decoding as well as reading from and writing to the +/// socket. +#[derive(Debug)] +struct Lines { + /// The TCP socket. + socket: TcpStream, + + /// Buffer used when reading from the socket. Data is not returned from this + /// buffer until an entire line has been read. + rd: BytesMut, + + /// Buffer used to stage data before writing it to the socket. + wr: BytesMut, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } +} + +impl Peer { + /// Create a new instance of `Peer`. + fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer { + // Get the client socket address + let addr = lines.socket.peer_addr().unwrap(); + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded(); + + // Add an entry for this `Peer` in the shared state map. + state.lock().unwrap().peers.insert(addr, tx); + + Peer { + name, + lines, + state, + rx, + addr, + } + } +} + +/// This is where a connected client is managed. +/// +/// A `Peer` is also a future representing completely processing the client. +/// +/// When a `Peer` is created, the first line (representing the client's name) +/// has already been read. When the socket closes, the `Peer` future completes. +/// +/// While processing, the peer future implementation will: +/// +/// 1) Receive messages on its message channel and write them to the socket. +/// 2) Receive messages from the socket and broadcast them to all peers. +/// +impl Future for Peer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + // Tokio (and futures) use cooperative scheduling without any + // preemption. If a task never yields execution back to the executor, + // then other tasks may be starved. + // + // To deal with this, robust applications should not have any unbounded + // loops. In this example, we will read at most `LINES_PER_TICK` lines + // from the client on each tick. + // + // If the limit is hit, the current task is notified, informing the + // executor to schedule the task again asap. + const LINES_PER_TICK: usize = 10; + + // Receive all messages from peers. + for i in 0..LINES_PER_TICK { + // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is + // safe. + match self.rx.poll().unwrap() { + Async::Ready(Some(v)) => { + // Buffer the line. Once all lines are buffered, they will + // be flushed to the socket (right below). + self.lines.buffer(&v); + + // If this is the last iteration, the loop will break even + // though there could still be lines to read. Because we did + // not reach `Async::NotReady`, we have to notify ourselves + // in order to tell the executor to schedule the task again. + if i + 1 == LINES_PER_TICK { + task::current().notify(); + } + } + _ => break, + } + } + + // Flush the write buffer to the socket + let _ = self.lines.poll_flush()?; + + // Read new lines from the socket + while let Async::Ready(line) = self.lines.poll()? { + println!("Received line ({:?}) : {:?}", self.name, line); + + if let Some(message) = line { + // Append the peer's name to the front of the line: + let mut line = self.name.clone(); + line.extend_from_slice(b": "); + line.extend_from_slice(&message); + line.extend_from_slice(b"\r\n"); + + // We're using `Bytes`, which allows zero-copy clones (by + // storing the data in an Arc internally). + // + // However, before cloning, we must freeze the data. This + // converts it from mutable -> immutable, allowing zero copy + // cloning. + let line = line.freeze(); + + // Now, send the line to all other peers + for (addr, tx) in &self.state.lock().unwrap().peers { + // Don't send the message to ourselves + if *addr != self.addr { + // The send only fails if the rx half has been dropped, + // however this is impossible as the `tx` half will be + // removed from the map before the `rx` is dropped. + tx.unbounded_send(line.clone()).unwrap(); + } + } + } else { + // EOF was reached. The remote client has disconnected. There is + // nothing more to do. + return Ok(Async::Ready(())); + } + } + + // As always, it is important to not just return `NotReady` without + // ensuring an inner future also returned `NotReady`. + // + // We know we got a `NotReady` from either `self.rx` or `self.lines`, so + // the contract is respected. + Ok(Async::NotReady) + } +} + +impl Drop for Peer { + fn drop(&mut self) { + self.state.lock().unwrap().peers.remove(&self.addr); + } +} + +impl Lines { + /// Create a new `Lines` codec backed by the socket + fn new(socket: TcpStream) -> Self { + Lines { + socket, + rd: BytesMut::new(), + wr: BytesMut::new(), + } + } + + /// Buffer a line. + /// + /// This writes the line to an internal buffer. Calls to `poll_flush` will + /// attempt to flush this buffer to the socket. + fn buffer(&mut self, line: &[u8]) { + // Ensure the buffer has capacity. Ideally this would not be unbounded, + // but to keep the example simple, we will not limit this. + self.wr.reserve(line.len()); + + // Push the line onto the end of the write buffer. + // + // The `put` function is from the `BufMut` trait. + self.wr.put(line); + } + + /// Flush the write buffer to the socket + fn poll_flush(&mut self) -> Poll<(), io::Error> { + // As long as there is buffered data to write, try to write it. + while !self.wr.is_empty() { + // Try to write some bytes to the socket + let n = try_ready!(self.socket.poll_write(&self.wr)); + + // As long as the wr is not empty, a successful write should + // never write 0 bytes. + assert!(n > 0); + + // This discards the first `n` bytes of the buffer. + let _ = self.wr.split_to(n); + } + + Ok(Async::Ready(())) + } + + /// Read data from the socket. + /// + /// This only returns `Ready` when the socket has closed. + fn fill_read_buf(&mut self) -> Poll<(), io::Error> { + loop { + // Ensure the read buffer has capacity. + // + // This might result in an internal allocation. + self.rd.reserve(1024); + + // Read data into the buffer. + let n = try_ready!(self.socket.read_buf(&mut self.rd)); + + if n == 0 { + return Ok(Async::Ready(())); + } + } + } +} + +impl Stream for Lines { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // First, read any new data that might have been received off the socket + let sock_closed = self.fill_read_buf()?.is_ready(); + + // Now, try finding lines + let pos = self + .rd + .windows(2) + .enumerate() + .find(|&(_, bytes)| bytes == b"\r\n") + .map(|(i, _)| i); + + if let Some(pos) = pos { + // Remove the line from the read buffer and set it to `line`. + let mut line = self.rd.split_to(pos + 2); + + // Drop the trailing \r\n + line.split_off(pos); + + // Return the line + return Ok(Async::Ready(Some(line))); + } + + if sock_closed { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +/// Spawn a task to manage the socket. +/// +/// This will read the first line from the socket to identify the client, then +/// add the client to the set of connected peers in the chat service. +fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { + // Wrap the socket with the `Lines` codec that we wrote above. + // + // By doing this, we can operate at the line level instead of doing raw byte + // manipulation. + let lines = Lines::new(socket); + + // The first line is treated as the client's name. The client is not added + // to the set of connected peers until this line is received. + // + // We use the `into_future` combinator to extract the first item from the + // lines stream. `into_future` takes a `Stream` and converts it to a future + // of `(first, rest)` where `rest` is the original stream instance. + let connection = lines + .into_future() + // `into_future` doesn't have the right error type, so map the error to + // make it work. + .map_err(|(e, _)| e) + // Process the first received line as the client's name. + .and_then(|(name, lines)| { + // If `name` is `None`, then the client disconnected without + // actually sending a line of data. + // + // Since the connection is closed, there is no further work that we + // need to do. So, we just terminate processing by returning + // `future::ok()`. + // + // The problem is that only a single future type can be returned + // from a combinator closure, but we want to return both + // `future::ok()` and `Peer` (below). + // + // This is a common problem, so the `futures` crate solves this by + // providing the `Either` helper enum that allows creating a single + // return type that covers two concrete future types. + let name = match name { + Some(name) => name, + None => { + // The remote client closed the connection without sending + // any data. + return Either::A(future::ok(())); + } + }; + + println!("`{:?}` is joining the chat", name); + + // Create the peer. + // + // This is also a future that processes the connection, only + // completing when the socket closes. + let peer = Peer::new(name, state, lines); + + // Wrap `peer` with `Either::B` to make the return type fit. + Either::B(peer) + }) + // Task futures have an error of type `()`, this ensures we handle the + // error. We do this by printing the error to STDOUT. + .map_err(|e| { + println!("connection error = {:?}", e); + }); + + // Spawn the task. Internally, this submits the task to a thread pool. + tokio::spawn(connection); +} + +pub fn main() -> Result<(), Box<std::error::Error>> { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Arc::new(Mutex::new(Shared::new())); + + let addr = "127.0.0.1:6142".parse()?; + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr)?; + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener + .incoming() + .for_each(move |socket| { + // Spawn a task to process the connection + process(socket, state.clone()); + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); + + println!("server running on localhost:6142"); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + // + // In our example, we have not defined a shutdown strategy, so this will + // block until `ctrl-c` is pressed at the terminal. + tokio::run(server); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/connect.rs b/third_party/rust/tokio-0.1.22/examples/connect.rs new file mode 100644 index 0000000000..4dc0ea31e2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/connect.rs @@ -0,0 +1,257 @@ +//! An example of hooking up stdin/stdout to either a TCP or UDP stream. +//! +//! This example will connect to a socket address specified in the argument list +//! and then forward all data read on stdin to the server, printing out all data +//! received on stdout. An optional `--udp` argument can be passed to specify +//! that the connection should be made over UDP instead of TCP, translating each +//! line entered on stdin to a UDP packet to be sent to the remote address. +//! +//! Note that this is not currently optimized for performance, especially +//! around buffer management. Rather it's intended to show an example of +//! working with a client. +//! +//! This example can be quite useful when interacting with the other examples in +//! this repository! Many of them recommend running this as a simple "hook up +//! stdin/stdout to a server" to get up and running. + +#![deny(warnings)] + +extern crate bytes; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::env; +use std::io::{self, Read, Write}; +use std::net::SocketAddr; +use std::thread; + +use futures::sync::mpsc; +use tokio::prelude::*; + +fn main() -> Result<(), Box<std::error::Error>> { + // Determine if we're going to run in TCP or UDP mode + let mut args = env::args().skip(1).collect::<Vec<_>>(); + let tcp = match args.iter().position(|a| a == "--udp") { + Some(i) => { + args.remove(i); + false + } + None => true, + }; + + // Parse what address we're going to connect to + let addr = match args.first() { + Some(addr) => addr, + None => Err("this program requires at least one argument")?, + }; + let addr = addr.parse::<SocketAddr>()?; + + // Right now Tokio doesn't support a handle to stdin running on the event + // loop, so we farm out that work to a separate thread. This thread will + // read data (with blocking I/O) from stdin and then send it to the event + // loop over a standard futures channel. + let (stdin_tx, stdin_rx) = mpsc::channel(0); + thread::spawn(|| read_stdin(stdin_tx)); + let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx")); + + // Now that we've got our stdin read we either set up our TCP connection or + // our UDP connection to get a stream of bytes we're going to emit to + // stdout. + let stdout = if tcp { + tcp::connect(&addr, Box::new(stdin_rx))? + } else { + udp::connect(&addr, Box::new(stdin_rx))? + }; + + // And now with our stream of bytes to write to stdout, we execute that in + // the event loop! Note that this is doing blocking I/O to emit data to + // stdout, and in general it's a no-no to do that sort of work on the event + // loop. In this case, though, we know it's ok as the event loop isn't + // otherwise running anything useful. + let mut out = io::stdout(); + + tokio::run({ + stdout + .for_each(move |chunk| out.write_all(&chunk)) + .map_err(|e| println!("error reading stdout; error = {:?}", e)) + }); + Ok(()) +} + +mod codec { + use bytes::{BufMut, BytesMut}; + use std::io; + use tokio::codec::{Decoder, Encoder}; + + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP/UDP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + pub struct Bytes; + + impl Decoder for Bytes { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } + } + + impl Encoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } + } +} + +mod tcp { + use tokio; + use tokio::codec::Decoder; + use tokio::net::TcpStream; + use tokio::prelude::*; + + use bytes::BytesMut; + use codec::Bytes; + + use std::error::Error; + use std::io; + use std::net::SocketAddr; + + pub fn connect( + addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, + ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> { + let tcp = TcpStream::connect(addr); + + // After the TCP connection has been established, we set up our client + // to start forwarding data. + // + // First we use the `Io::framed` method with a simple implementation of + // a `Codec` (listed below) that just ships bytes around. We then split + // that in two to work with the stream and sink separately. + // + // Half of the work we're going to do is to take all data we receive on + // `stdin` and send that along the TCP stream (`sink`). The second half + // is to take all the data we receive (`stream`) and then write that to + // stdout. We'll be passing this handle back out from this method. + // + // You'll also note that we *spawn* the work to read stdin and write it + // to the TCP stream. This is done to ensure that happens concurrently + // with us reading data from the stream. + let stream = Box::new( + tcp.map(move |stream| { + let (sink, stream) = Bytes.framed(stream).split(); + + tokio::spawn(stdin.forward(sink).then(|result| { + if let Err(e) = result { + println!("failed to write to socket: {}", e) + } + Ok(()) + })); + + stream + }) + .flatten_stream(), + ); + Ok(stream) + } +} + +mod udp { + use std::error::Error; + use std::io; + use std::net::SocketAddr; + + use bytes::BytesMut; + use tokio; + use tokio::net::{UdpFramed, UdpSocket}; + use tokio::prelude::*; + + use codec::Bytes; + + pub fn connect( + &addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, + ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> { + // We'll bind our UDP socket to a local IP/port, but for now we + // basically let the OS pick both of those. + let addr_to_bind = if addr.ip().is_ipv4() { + "0.0.0.0:0".parse()? + } else { + "[::]:0".parse()? + }; + let udp = match UdpSocket::bind(&addr_to_bind) { + Ok(udp) => udp, + Err(_) => Err("failed to bind socket")?, + }; + + // Like above with TCP we use an instance of `Bytes` codec to transform + // this UDP socket into a framed sink/stream which operates over + // discrete values. In this case we're working with *pairs* of socket + // addresses and byte buffers. + let (sink, stream) = UdpFramed::new(udp, Bytes).split(); + + // All bytes from `stdin` will go to the `addr` specified in our + // argument list. Like with TCP this is spawned concurrently + let forward_stdin = stdin + .map(move |chunk| (chunk, addr)) + .forward(sink) + .then(|result| { + if let Err(e) = result { + println!("failed to write to socket: {}", e) + } + Ok(()) + }); + + // With UDP we could receive data from any source, so filter out + // anything coming from a different address + let receive = stream.filter_map(move |(chunk, src)| { + if src == addr { + Some(chunk.into()) + } else { + None + } + }); + + let stream = Box::new( + future::lazy(|| { + tokio::spawn(forward_stdin); + future::ok(receive) + }) + .flatten_stream(), + ); + Ok(stream) + } +} + +// Our helper method which will read data from stdin and send it along the +// sender provided. +fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) { + let mut stdin = io::stdin(); + loop { + let mut buf = vec![0; 1024]; + let n = match stdin.read(&mut buf) { + Err(_) | Ok(0) => break, + Ok(n) => n, + }; + buf.truncate(n); + tx = match tx.send(buf).wait() { + Ok(tx) => tx, + Err(_) => break, + }; + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/echo-udp.rs b/third_party/rust/tokio-0.1.22/examples/echo-udp.rs new file mode 100644 index 0000000000..93ebca799d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/echo-udp.rs @@ -0,0 +1,74 @@ +//! An UDP echo server that just sends back everything that it receives. +//! +//! If you're on Unix you can test this out by in one terminal executing: +//! +//! cargo run --example echo-udp +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect -- --udp 127.0.0.1:8080 +//! +//! Each line you type in to the `nc` terminal should be echo'd back to you! + +#![deny(warnings)] + +#[macro_use] +extern crate futures; +extern crate tokio; + +use std::net::SocketAddr; +use std::{env, io}; + +use tokio::net::UdpSocket; +use tokio::prelude::*; + +struct Server { + socket: UdpSocket, + buf: Vec<u8>, + to_send: Option<(usize, SocketAddr)>, +} + +impl Future for Server { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + loop { + // First we check to see if there's a message we need to echo back. + // If so then we try to send it back to the original source, waiting + // until it's writable and we're able to do so. + if let Some((size, peer)) = self.to_send { + let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); + println!("Echoed {}/{} bytes to {}", amt, size, peer); + self.to_send = None; + } + + // If we're here then `to_send` is `None`, so we take a look for the + // next message we're going to echo back. + self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); + } + } +} + +fn main() -> Result<(), Box<std::error::Error>> { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + let socket = UdpSocket::bind(&addr)?; + println!("Listening on: {}", socket.local_addr()?); + + let server = Server { + socket: socket, + buf: vec![0; 1024], + to_send: None, + }; + + // This starts the server task. + // + // `map_err` handles the error by logging it and maps the future to a type + // that can be spawned. + // + // `tokio::run` spawns the task on the Tokio runtime and starts running. + tokio::run(server.map_err(|e| println!("server error = {:?}", e))); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/echo.rs b/third_party/rust/tokio-0.1.22/examples/echo.rs new file mode 100644 index 0000000000..45f808f89d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/echo.rs @@ -0,0 +1,115 @@ +//! A "hello world" echo server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! write back everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example echo +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be echo'd back to +//! you! If you open up multiple terminals running the `connect` example you +//! should be able to see them all make progress simultaneously. + +#![deny(warnings)] + +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +use std::env; +use std::net::SocketAddr; + +fn main() -> Result<(), Box<std::error::Error>> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // Here we convert the `TcpListener` to a stream of incoming connections + // with the `incoming` method. We then define how to process each element in + // the stream with the `for_each` method. + // + // This combinator, defined on the `Stream` trait, will allow us to define a + // computation to happen for all items on the stream (in this case TCP + // connections made to the server). The return value of the `for_each` + // method is itself a future representing processing the entire stream of + // connections, and ends up being our server. + let done = socket + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + // Once we're inside this closure this represents an accepted client + // from our server. The `socket` is the client connection (similar to + // how the standard library operates). + // + // We just want to copy all data read from the socket back onto the + // socket itself (e.g. "echo"). We can use the standard `io::copy` + // combinator in the `tokio-core` crate to do precisely this! + // + // The `copy` function takes two arguments, where to read from and where + // to write to. We only have one argument, though, with `socket`. + // Luckily there's a method, `Io::split`, which will split an Read/Write + // stream into its two halves. This operation allows us to work with + // each stream independently, such as pass them as two arguments to the + // `copy` function. + // + // The `copy` function then returns a future, and this future will be + // resolved when the copying operation is complete, resolving to the + // amount of data that was copied. + let (reader, writer) = socket.split(); + let amt = io::copy(reader, writer); + + // After our copy operation is complete we just print out some helpful + // information. + let msg = amt.then(move |result| { + match result { + Ok((amt, _, _)) => println!("wrote {} bytes", amt), + Err(e) => println!("error: {}", e), + } + + Ok(()) + }); + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // This function will transfer ownership of the future (`msg` in this + // case) to the Tokio runtime thread pool that. The thread pool will + // drive the future to completion. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(msg) + }); + + // And finally now that we've define what our server is, we run it! + // + // This starts the Tokio runtime, spawns the server task, and blocks the + // current thread until all tasks complete execution. Since the `done` task + // never completes (it just keeps accepting sockets), `tokio::run` blocks + // forever (until ctrl-c is pressed). + tokio::run(done); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/hello_world.rs b/third_party/rust/tokio-0.1.22/examples/hello_world.rs new file mode 100644 index 0000000000..c82762691a --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/hello_world.rs @@ -0,0 +1,58 @@ +//! Hello world server. +//! +//! A simple client that opens a TCP stream, writes "hello world\n", and closes +//! the connection. +//! +//! You can test this out by running: +//! +//! ncat -l 6142 +//! +//! And then in another terminal run: +//! +//! cargo run --example hello_world + +#![deny(warnings)] + +extern crate tokio; + +use tokio::io; +use tokio::net::TcpStream; +use tokio::prelude::*; + +pub fn main() -> Result<(), Box<std::error::Error>> { + let addr = "127.0.0.1:6142".parse()?; + + // Open a TCP stream to the socket address. + // + // Note that this is the Tokio TcpStream, which is fully async. + let client = TcpStream::connect(&addr) + .and_then(|stream| { + println!("created stream"); + io::write_all(stream, "hello world\n").then(|result| { + println!("wrote to stream; success={:?}", result.is_ok()); + Ok(()) + }) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("connection error = {:?}", err); + }); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + println!("About to create the stream and write to it..."); + tokio::run(client); + println!("Stream has been created and written to."); + + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs b/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs new file mode 100644 index 0000000000..8e3e129965 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs @@ -0,0 +1,87 @@ +//! An example how to manually assemble a runtime and run some tasks on it. +//! +//! This is closer to the single-threaded runtime than the default tokio one, as it is simpler to +//! grasp. There are conceptually similar, but the multi-threaded one would be more code. If you +//! just want to *use* a single-threaded runtime, use the one provided by tokio directly +//! (`tokio::runtime::current_thread::Runtime::new()`. This is a demonstration only. +//! +//! Note that the error handling is a bit left out. Also, the `run` could be modified to return the +//! result of the provided future. + +extern crate futures; +extern crate tokio; +extern crate tokio_current_thread; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_timer; + +use std::io::Error as IoError; +use std::time::{Duration, Instant}; + +use futures::{future, Future}; +use tokio_current_thread::CurrentThread; +use tokio_reactor::Reactor; +use tokio_timer::timer::{self, Timer}; + +/// Creates a "runtime". +/// +/// This is similar to running `tokio::runtime::current_thread::Runtime::new()`. +fn run<F: Future<Item = (), Error = ()>>(f: F) -> Result<(), IoError> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new(reactor); + let timer_handle = timer.handle(); + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor generate some new stimuli for the + // futures to continue in their life. + let mut executor = CurrentThread::new_with_park(timer); + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + // This will set the default handle and timer to use inside the closure and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the current single-threaded + // executor when used. This is a trick, because we need two mutable references to the + // executor (one to run the provided future, another to install as the default one). We + // use the fake one here as the default one. + let mut default_executor = tokio_current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + // Run the provided future + executor.block_on(f).unwrap(); + // Run all the other futures that are still left in the executor + executor.run().unwrap(); + }); + }); + }); + Ok(()) +} + +fn main() -> Result<(), Box<std::error::Error>> { + run(future::lazy(|| { + // Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn(). + // It also can use the default reactor and create timeouts. + + // Connect somewhere. And then do nothing with it. Yes, useless. + // + // This will use the default reactor which runs in the current thread. + let connect = tokio::net::TcpStream::connect(&"127.0.0.1:53".parse().unwrap()) + .map(|_| println!("Connected")) + .map_err(|e| println!("Failed to connect: {}", e)); + // We can spawn it without requiring Send. This would panic if we run it outside of the + // `run` (or outside of anything else) + tokio_current_thread::spawn(connect); + + // We can also create timeouts. + let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5)) + .map(|()| println!("5 seconds are over")) + .map_err(|e| println!("Failed to wait: {}", e)); + // We can spawn on the default executor, which is also the local one. + tokio::executor::spawn(deadline); + Ok(()) + }))?; + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs b/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs new file mode 100644 index 0000000000..94a606483c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs @@ -0,0 +1,150 @@ +//! A "print-each-packet" server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! put down in the stdout everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example print\_each\_packet +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be written to terminal! +//! +//! Minimal js example: +//! +//! ```js +//! var net = require("net"); +//! +//! var listenPort = 8080; +//! +//! var server = net.createServer(function (socket) { +//! socket.on("data", function (bytes) { +//! console.log("bytes", bytes); +//! }); +//! +//! socket.on("end", function() { +//! console.log("Socket received FIN packet and closed connection"); +//! }); +//! socket.on("error", function (error) { +//! console.log("Socket closed with error", error); +//! }); +//! +//! socket.on("close", function (with_error) { +//! if (with_error) { +//! console.log("Socket closed with result: Err(SomeError)"); +//! } else { +//! console.log("Socket closed with result: Ok(())"); +//! } +//! }); +//! +//! }); +//! +//! server.listen(listenPort); +//! +//! console.log("Listening on:", listenPort); +//! ``` +//! + +#![deny(warnings)] + +extern crate tokio; +extern crate tokio_codec; + +use tokio::codec::Decoder; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio_codec::BytesCodec; + +use std::env; +use std::net::SocketAddr; + +fn main() -> Result<(), Box<std::error::Error>> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // Here we convert the `TcpListener` to a stream of incoming connections + // with the `incoming` method. We then define how to process each element in + // the stream with the `for_each` method. + // + // This combinator, defined on the `Stream` trait, will allow us to define a + // computation to happen for all items on the stream (in this case TCP + // connections made to the server). The return value of the `for_each` + // method is itself a future representing processing the entire stream of + // connections, and ends up being our server. + let done = socket + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + // Once we're inside this closure this represents an accepted client + // from our server. The `socket` is the client connection (similar to + // how the standard library operates). + // + // We're parsing each socket with the `BytesCodec` included in `tokio_io`, + // and then we `split` each codec into the reader/writer halves. + // + // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html + let framed = BytesCodec::new().framed(socket); + let (_writer, reader) = framed.split(); + + let processor = reader + .for_each(|bytes| { + println!("bytes: {:?}", bytes); + Ok(()) + }) + // After our copy operation is complete we just print out some helpful + // information. + .and_then(|()| { + println!("Socket received FIN packet and closed connection"); + Ok(()) + }) + .or_else(|err| { + println!("Socket closed with error: {:?}", err); + // We have to return the error to catch it in the next ``.then` call + Err(err) + }) + .then(|result| { + println!("Socket closed with result: {:?}", result); + Ok(()) + }); + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // This function will transfer ownership of the future (`msg` in this + // case) to the Tokio runtime thread pool that. The thread pool will + // drive the future to completion. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(processor) + }); + + // And finally now that we've define what our server is, we run it! + // + // This starts the Tokio runtime, spawns the server task, and blocks the + // current thread until all tasks complete execution. Since the `done` task + // never completes (it just keeps accepting sockets), `tokio::run` blocks + // forever (until ctrl-c is pressed). + tokio::run(done); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/proxy.rs b/third_party/rust/tokio-0.1.22/examples/proxy.rs new file mode 100644 index 0000000000..2cbcf119a2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/proxy.rs @@ -0,0 +1,130 @@ +//! A proxy that forwards data to another server and forwards that server's +//! responses back to clients. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! You can showcase this by running this in one terminal: +//! +//! cargo run --example proxy +//! +//! This in another terminal +//! +//! cargo run --example echo +//! +//! And finally this in another terminal +//! +//! cargo run --example connect 127.0.0.1:8081 +//! +//! This final terminal will connect to our proxy, which will in turn connect to +//! the echo server, and you'll be able to see data flowing between them. + +#![deny(warnings)] + +extern crate tokio; + +use std::env; +use std::io::{self, Read, Write}; +use std::net::{Shutdown, SocketAddr}; +use std::sync::{Arc, Mutex}; + +use tokio::io::{copy, shutdown}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +fn main() -> Result<(), Box<std::error::Error>> { + let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); + let listen_addr = listen_addr.parse::<SocketAddr>()?; + + let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); + let server_addr = server_addr.parse::<SocketAddr>()?; + + // Create a TCP listener which will listen for incoming connections. + let socket = TcpListener::bind(&listen_addr)?; + println!("Listening on: {}", listen_addr); + println!("Proxying to: {}", server_addr); + + let done = socket + .incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |client| { + let server = TcpStream::connect(&server_addr); + let amounts = server.and_then(move |server| { + // Create separate read/write handles for the TCP clients that we're + // proxying data between. Note that typically you'd use + // `AsyncRead::split` for this operation, but we want our writer + // handles to have a custom implementation of `shutdown` which + // actually calls `TcpStream::shutdown` to ensure that EOF is + // transmitted properly across the proxied connection. + // + // As a result, we wrap up our client/server manually in arcs and + // use the impls below on our custom `MyTcpStream` type. + let client_reader = MyTcpStream(Arc::new(Mutex::new(client))); + let client_writer = client_reader.clone(); + let server_reader = MyTcpStream(Arc::new(Mutex::new(server))); + let server_writer = server_reader.clone(); + + // Copy the data (in parallel) between the client and the server. + // After the copy is done we indicate to the remote side that we've + // finished by shutting down the connection. + let client_to_server = copy(client_reader, server_writer) + .and_then(|(n, _, server_writer)| shutdown(server_writer).map(move |_| n)); + + let server_to_client = copy(server_reader, client_writer) + .and_then(|(n, _, client_writer)| shutdown(client_writer).map(move |_| n)); + + client_to_server.join(server_to_client) + }); + + let msg = amounts + .map(move |(from_client, from_server)| { + println!( + "client wrote {} bytes and received {} bytes", + from_client, from_server + ); + }) + .map_err(|e| { + // Don't panic. Maybe the client just disconnected too soon. + println!("error: {}", e); + }); + + tokio::spawn(msg); + + Ok(()) + }); + + tokio::run(done); + Ok(()) +} + +// This is a custom type used to have a custom implementation of the +// `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to +// notify the remote end that we're done writing. +#[derive(Clone)] +struct MyTcpStream(Arc<Mutex<TcpStream>>); + +impl Read for MyTcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.lock().unwrap().read(buf) + } +} + +impl Write for MyTcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.lock().unwrap().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncRead for MyTcpStream {} + +impl AsyncWrite for MyTcpStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.lock().unwrap().shutdown(Shutdown::Write)?; + Ok(().into()) + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/tinydb.rs b/third_party/rust/tokio-0.1.22/examples/tinydb.rs new file mode 100644 index 0000000000..11298ed133 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/tinydb.rs @@ -0,0 +1,227 @@ +//! A "tiny database" and accompanying protocol +//! +//! This example shows the usage of shared state amongst all connected clients, +//! namely a database of key/value pairs. Each connected client can send a +//! series of GET/SET commands to query the current value of a key or set the +//! value of a key. +//! +//! This example has a simple protocol you can use to interact with the server. +//! To run, first run this in one terminal window: +//! +//! cargo run --example tinydb +//! +//! and next in another windows run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! In the `connect` window you can type in commands where when you hit enter +//! you'll get a response from the server for that command. An example session +//! is: +//! +//! +//! $ cargo run --example connect 127.0.0.1:8080 +//! GET foo +//! foo = bar +//! GET FOOBAR +//! error: no key FOOBAR +//! SET FOOBAR my awesome string +//! set FOOBAR = `my awesome string`, previous: None +//! SET foo tokio +//! set foo = `tokio`, previous: Some("bar") +//! GET foo +//! foo = tokio +//! +//! Namely you can issue two forms of commands: +//! +//! * `GET $key` - this will fetch the value of `$key` from the database and +//! return it. The server's database is initially populated with the key `foo` +//! set to the value `bar` +//! * `SET $key $value` - this will set the value of `$key` to `$value`, +//! returning the previous value, if any. + +#![deny(warnings)] + +extern crate tokio; + +use std::collections::HashMap; +use std::env; +use std::io::BufReader; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +use tokio::io::{lines, write_all}; +use tokio::net::TcpListener; +use tokio::prelude::*; + +/// The in-memory database shared amongst all clients. +/// +/// This database will be shared via `Arc`, so to mutate the internal map we're +/// going to use a `Mutex` for interior mutability. +struct Database { + map: Mutex<HashMap<String, String>>, +} + +/// Possible requests our clients can send us +enum Request { + Get { key: String }, + Set { key: String, value: String }, +} + +/// Responses to the `Request` commands above +enum Response { + Value { + key: String, + value: String, + }, + Set { + key: String, + value: String, + previous: Option<String>, + }, + Error { + msg: String, + }, +} + +fn main() -> Result<(), Box<std::error::Error>> { + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + let listener = TcpListener::bind(&addr).map_err(|_| "failed to bind")?; + println!("Listening on: {}", addr); + + // Create the shared state of this server that will be shared amongst all + // clients. We populate the initial database and then create the `Database` + // structure. Note the usage of `Arc` here which will be used to ensure that + // each independently spawned client will have a reference to the in-memory + // database. + let mut initial_db = HashMap::new(); + initial_db.insert("foo".to_string(), "bar".to_string()); + let db = Arc::new(Database { + map: Mutex::new(initial_db), + }); + + let done = listener + .incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |socket| { + // As with many other small examples, the first thing we'll do is + // *split* this TCP stream into two separately owned halves. This'll + // allow us to work with the read and write halves independently. + let (reader, writer) = socket.split(); + + // Since our protocol is line-based we use `tokio_io`'s `lines` utility + // to convert our stream of bytes, `reader`, into a `Stream` of lines. + let lines = lines(BufReader::new(reader)); + + // Here's where the meat of the processing in this server happens. First + // we see a clone of the database being created, which is creating a + // new reference for this connected client to use. Also note the `move` + // keyword on the closure here which moves ownership of the reference + // into the closure, which we'll need for spawning the client below. + // + // The `map` function here means that we'll run some code for all + // requests (lines) we receive from the client. The actual handling here + // is pretty simple, first we parse the request and if it's valid we + // generate a response based on the values in the database. + let db = db.clone(); + let responses = lines.map(move |line| { + let request = match Request::parse(&line) { + Ok(req) => req, + Err(e) => return Response::Error { msg: e }, + }; + + let mut db = db.map.lock().unwrap(); + match request { + Request::Get { key } => match db.get(&key) { + Some(value) => Response::Value { + key, + value: value.clone(), + }, + None => Response::Error { + msg: format!("no key {}", key), + }, + }, + Request::Set { key, value } => { + let previous = db.insert(key.clone(), value.clone()); + Response::Set { + key, + value, + previous, + } + } + } + }); + + // At this point `responses` is a stream of `Response` types which we + // now want to write back out to the client. To do that we use + // `Stream::fold` to perform a loop here, serializing each response and + // then writing it out to the client. + let writes = responses.fold(writer, |writer, response| { + let mut response = response.serialize(); + response.push('\n'); + write_all(writer, response.into_bytes()).map(|(w, _)| w) + }); + + // Like with other small servers, we'll `spawn` this client to ensure it + // runs concurrently with all other clients, for now ignoring any errors + // that we see. + let msg = writes.then(move |_| Ok(())); + + tokio::spawn(msg) + }); + + tokio::run(done); + Ok(()) +} + +impl Request { + fn parse(input: &str) -> Result<Request, String> { + let mut parts = input.splitn(3, " "); + match parts.next() { + Some("GET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("GET must be followed by a key")), + }; + if parts.next().is_some() { + return Err(format!("GET's key must not be followed by anything")); + } + Ok(Request::Get { + key: key.to_string(), + }) + } + Some("SET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("SET must be followed by a key")), + }; + let value = match parts.next() { + Some(value) => value, + None => return Err(format!("SET needs a value")), + }; + Ok(Request::Set { + key: key.to_string(), + value: value.to_string(), + }) + } + Some(cmd) => Err(format!("unknown command: {}", cmd)), + None => Err(format!("empty input")), + } + } +} + +impl Response { + fn serialize(&self) -> String { + match *self { + Response::Value { ref key, ref value } => format!("{} = {}", key, value), + Response::Set { + ref key, + ref value, + ref previous, + } => format!("set {} = `{}`, previous: {:?}", key, value, previous), + Response::Error { ref msg } => format!("error: {}", msg), + } + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs b/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs new file mode 100644 index 0000000000..cde1b79afb --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs @@ -0,0 +1,325 @@ +//! A "tiny" example of HTTP request/response handling using transports. +//! +//! This example is intended for *learning purposes* to see how various pieces +//! hook up together and how HTTP can get up and running. Note that this example +//! is written with the restriction that it *can't* use any "big" library other +//! than Tokio, if you'd like a "real world" HTTP library you likely want a +//! crate like Hyper. +//! +//! Code here is based on the `echo-threads` example and implements two paths, +//! the `/plaintext` and `/json` routes to respond with some text and json, +//! respectively. By default this will run I/O on all the cores your system has +//! available, and it doesn't support HTTP request bodies. + +#![deny(warnings)] + +extern crate bytes; +extern crate http; +extern crate httparse; +#[macro_use] +extern crate serde_derive; +extern crate serde_json; +extern crate time; +extern crate tokio; +extern crate tokio_io; + +use std::net::SocketAddr; +use std::{env, fmt, io}; + +use tokio::codec::{Decoder, Encoder}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +use bytes::BytesMut; +use http::header::HeaderValue; +use http::{Request, Response, StatusCode}; + +fn main() -> Result<(), Box<std::error::Error>> { + // Parse the arguments, bind the TCP socket we'll be listening to, spin up + // our worker threads, and start shipping sockets to those worker threads. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + let listener = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + tokio::run({ + listener + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(|socket| { + process(socket); + Ok(()) + }) + }); + Ok(()) +} + +fn process(socket: TcpStream) { + let (tx, rx) = + // Frame the socket using the `Http` protocol. This maps the TCP socket + // to a Stream + Sink of HTTP frames. + Http.framed(socket) + // This splits a single `Stream + Sink` value into two separate handles + // that can be used independently (even on different tasks or threads). + .split(); + + // Map all requests into responses and send them back to the client. + let task = tx.send_all(rx.and_then(respond)).then(|res| { + if let Err(e) = res { + println!("failed to process connection; error = {:?}", e); + } + + Ok(()) + }); + + // Spawn the task that handles the connection. + tokio::spawn(task); +} + +/// "Server logic" is implemented in this function. +/// +/// This function is a map from and HTTP request to a future of a response and +/// represents the various handling a server might do. Currently the contents +/// here are pretty uninteresting. +fn respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send> { + let f = future::lazy(move || { + let mut response = Response::builder(); + let body = match req.uri().path() { + "/plaintext" => { + response.header("Content-Type", "text/plain"); + "Hello, World!".to_string() + } + "/json" => { + response.header("Content-Type", "application/json"); + + #[derive(Serialize)] + struct Message { + message: &'static str, + } + serde_json::to_string(&Message { + message: "Hello, World!", + })? + } + _ => { + response.status(StatusCode::NOT_FOUND); + String::new() + } + }; + let response = response + .body(body) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + Ok(response) + }); + + Box::new(f) +} + +struct Http; + +/// Implementation of encoding an HTTP response into a `BytesMut`, basically +/// just writing out an HTTP/1.1 response. +impl Encoder for Http { + type Item = Response<String>; + type Error = io::Error; + + fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> { + use std::fmt::Write; + + write!( + BytesWrite(dst), + "\ + HTTP/1.1 {}\r\n\ + Server: Example\r\n\ + Content-Length: {}\r\n\ + Date: {}\r\n\ + ", + item.status(), + item.body().len(), + date::now() + ) + .unwrap(); + + for (k, v) in item.headers() { + dst.extend_from_slice(k.as_str().as_bytes()); + dst.extend_from_slice(b": "); + dst.extend_from_slice(v.as_bytes()); + dst.extend_from_slice(b"\r\n"); + } + + dst.extend_from_slice(b"\r\n"); + dst.extend_from_slice(item.body().as_bytes()); + + return Ok(()); + + // Right now `write!` on `Vec<u8>` goes through io::Write and is not + // super speedy, so inline a less-crufty implementation here which + // doesn't go through io::Error. + struct BytesWrite<'a>(&'a mut BytesMut); + + impl<'a> fmt::Write for BytesWrite<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } + + fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result { + fmt::write(self, args) + } + } + } +} + +/// Implementation of decoding an HTTP request from the bytes we've read so far. +/// This leverages the `httparse` crate to do the actual parsing and then we use +/// that information to construct an instance of a `http::Request` object, +/// trying to avoid allocations where possible. +impl Decoder for Http { + type Item = Request<()>; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> { + // TODO: we should grow this headers array if parsing fails and asks + // for more headers + let mut headers = [None; 16]; + let (method, path, version, amt) = { + let mut parsed_headers = [httparse::EMPTY_HEADER; 16]; + let mut r = httparse::Request::new(&mut parsed_headers); + let status = r.parse(src).map_err(|e| { + let msg = format!("failed to parse http request: {:?}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let amt = match status { + httparse::Status::Complete(amt) => amt, + httparse::Status::Partial => return Ok(None), + }; + + let toslice = |a: &[u8]| { + let start = a.as_ptr() as usize - src.as_ptr() as usize; + assert!(start < src.len()); + (start, start + a.len()) + }; + + for (i, header) in r.headers.iter().enumerate() { + let k = toslice(header.name.as_bytes()); + let v = toslice(header.value); + headers[i] = Some((k, v)); + } + + ( + toslice(r.method.unwrap().as_bytes()), + toslice(r.path.unwrap().as_bytes()), + r.version.unwrap(), + amt, + ) + }; + if version != 1 { + return Err(io::Error::new( + io::ErrorKind::Other, + "only HTTP/1.1 accepted", + )); + } + let data = src.split_to(amt).freeze(); + let mut ret = Request::builder(); + ret.method(&data[method.0..method.1]); + ret.uri(data.slice(path.0, path.1)); + ret.version(http::Version::HTTP_11); + for header in headers.iter() { + let (k, v) = match *header { + Some((ref k, ref v)) => (k, v), + None => break, + }; + let value = unsafe { HeaderValue::from_shared_unchecked(data.slice(v.0, v.1)) }; + ret.header(&data[k.0..k.1], value); + } + + let req = ret + .body(()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(Some(req)) + } +} + +mod date { + use std::cell::RefCell; + use std::fmt::{self, Write}; + use std::str; + + use time::{self, Duration}; + + pub struct Now(()); + + /// Returns a struct, which when formatted, renders an appropriate `Date` + /// header value. + pub fn now() -> Now { + Now(()) + } + + // Gee Alex, doesn't this seem like premature optimization. Well you see + // there Billy, you're absolutely correct! If your server is *bottlenecked* + // on rendering the `Date` header, well then boy do I have news for you, you + // don't need this optimization. + // + // In all seriousness, though, a simple "hello world" benchmark which just + // sends back literally "hello world" with standard headers actually is + // bottlenecked on rendering a date into a byte buffer. Since it was at the + // top of a profile, and this was done for some competitive benchmarks, this + // module was written. + // + // Just to be clear, though, I was not intending on doing this because it + // really does seem kinda absurd, but it was done by someone else [1], so I + // blame them! :) + // + // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66 + + struct LastRenderedNow { + bytes: [u8; 128], + amt: usize, + next_update: time::Timespec, + } + + thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow { + bytes: [0; 128], + amt: 0, + next_update: time::Timespec::new(0, 0), + })); + + impl fmt::Display for Now { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + LAST.with(|cache| { + let mut cache = cache.borrow_mut(); + let now = time::get_time(); + if now >= cache.next_update { + cache.update(now); + } + f.write_str(cache.buffer()) + }) + } + } + + impl LastRenderedNow { + fn buffer(&self) -> &str { + str::from_utf8(&self.bytes[..self.amt]).unwrap() + } + + fn update(&mut self, now: time::Timespec) { + self.amt = 0; + write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap(); + self.next_update = now + Duration::seconds(1); + self.next_update.nsec = 0; + } + } + + struct LocalBuffer<'a>(&'a mut LastRenderedNow); + + impl<'a> fmt::Write for LocalBuffer<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + let start = self.0.amt; + let end = start + s.len(); + self.0.bytes[start..end].copy_from_slice(s.as_bytes()); + self.0.amt += s.len(); + Ok(()) + } + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/udp-client.rs b/third_party/rust/tokio-0.1.22/examples/udp-client.rs new file mode 100644 index 0000000000..900d3616df --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/udp-client.rs @@ -0,0 +1,70 @@ +//! A UDP client that just sends everything it gets via `stdio` in a single datagram, and then +//! waits for a reply. +//! +//! For the reasons of simplicity data from `stdio` is read until `EOF` in a blocking manner. +//! +//! You can test this out by running an echo server: +//! +//! ``` +//! $ cargo run --example echo-udp -- 127.0.0.1:8080 +//! ``` +//! +//! and running the client in another terminal: +//! +//! ``` +//! $ cargo run --example udp-client +//! ``` +//! +//! You can optionally provide any custom endpoint address for the client: +//! +//! ``` +//! $ cargo run --example udp-client -- 127.0.0.1:8080 +//! ``` +//! +//! Don't forget to pass `EOF` to the standard input of the client! +//! +//! Please mind that since the UDP protocol doesn't have any capabilities to detect a broken +//! connection the server needs to be run first, otherwise the client will block forever. + +extern crate futures; +extern crate tokio; + +use std::env; +use std::io::stdin; +use std::net::SocketAddr; +use tokio::net::UdpSocket; +use tokio::prelude::*; + +fn get_stdin_data() -> Result<Vec<u8>, Box<std::error::Error>> { + let mut buf = Vec::new(); + stdin().read_to_end(&mut buf)?; + Ok(buf) +} + +fn main() -> Result<(), Box<std::error::Error>> { + let remote_addr: SocketAddr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".into()) + .parse()?; + // We use port 0 to let the operating system allocate an available port for us. + let local_addr: SocketAddr = if remote_addr.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + } + .parse()?; + let socket = UdpSocket::bind(&local_addr)?; + const MAX_DATAGRAM_SIZE: usize = 65_507; + socket + .send_dgram(get_stdin_data()?, &remote_addr) + .and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE])) + .map(|(_, data, len, _)| { + println!( + "Received {} bytes:\n{}", + len, + String::from_utf8_lossy(&data[..len]) + ) + }) + .wait()?; + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/udp-codec.rs b/third_party/rust/tokio-0.1.22/examples/udp-codec.rs new file mode 100644 index 0000000000..3657d8cc17 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/udp-codec.rs @@ -0,0 +1,65 @@ +//! This example leverages `BytesCodec` to create a UDP client and server which +//! speak a custom protocol. +//! +//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of +//! client messages. These messages are then processed and returned back as a +//! new message with a new destination. Overall, we then use this to construct a +//! "ping pong" pair where two sockets are sending messages back and forth. + +#![deny(warnings)] + +extern crate env_logger; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; + +use std::net::SocketAddr; + +use tokio::net::{UdpFramed, UdpSocket}; +use tokio::prelude::*; +use tokio_codec::BytesCodec; + +fn main() -> Result<(), Box<std::error::Error>> { + let _ = env_logger::init(); + + let addr: SocketAddr = "127.0.0.1:0".parse()?; + + // Bind both our sockets and then figure out what ports we got. + let a = UdpSocket::bind(&addr)?; + let b = UdpSocket::bind(&addr)?; + let b_addr = b.local_addr()?; + + // We're parsing each socket with the `BytesCodec` included in `tokio_io`, and then we + // `split` each codec into the sink/stream halves. + let (a_sink, a_stream) = UdpFramed::new(a, BytesCodec::new()).split(); + let (b_sink, b_stream) = UdpFramed::new(b, BytesCodec::new()).split(); + + // Start off by sending a ping from a to b, afterwards we just print out + // what they send us and continually send pings + // let pings = stream::iter((0..5).map(Ok)); + let a = a_sink.send(("PING".into(), b_addr)).and_then(|a_sink| { + let mut i = 0; + let a_stream = a_stream.take(4).map(move |(msg, addr)| { + i += 1; + println!("[a] recv: {}", String::from_utf8_lossy(&msg)); + (format!("PING {}", i).into(), addr) + }); + a_sink.send_all(a_stream) + }); + + // The second client we have will receive the pings from `a` and then send + // back pongs. + let b_stream = b_stream.map(|(msg, addr)| { + println!("[b] recv: {}", String::from_utf8_lossy(&msg)); + ("PONG".into(), addr) + }); + let b = b_sink.send_all(b_stream); + + // Spawn the sender of pongs and then wait for our pinger to finish. + tokio::run({ + b.join(a) + .map(|_| ()) + .map_err(|e| println!("error = {:?}", e)) + }); + Ok(()) +} |