summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.22/examples
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-0.1.22/examples
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.22/examples')
-rw-r--r--third_party/rust/tokio-0.1.22/examples/README.md62
-rw-r--r--third_party/rust/tokio-0.1.22/examples/blocking.rs87
-rw-r--r--third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs172
-rw-r--r--third_party/rust/tokio-0.1.22/examples/chat-combinator.rs156
-rw-r--r--third_party/rust/tokio-0.1.22/examples/chat.rs473
-rw-r--r--third_party/rust/tokio-0.1.22/examples/connect.rs257
-rw-r--r--third_party/rust/tokio-0.1.22/examples/echo-udp.rs74
-rw-r--r--third_party/rust/tokio-0.1.22/examples/echo.rs115
-rw-r--r--third_party/rust/tokio-0.1.22/examples/hello_world.rs58
-rw-r--r--third_party/rust/tokio-0.1.22/examples/manual-runtime.rs87
-rw-r--r--third_party/rust/tokio-0.1.22/examples/print_each_packet.rs150
-rw-r--r--third_party/rust/tokio-0.1.22/examples/proxy.rs130
-rw-r--r--third_party/rust/tokio-0.1.22/examples/tinydb.rs227
-rw-r--r--third_party/rust/tokio-0.1.22/examples/tinyhttp.rs325
-rw-r--r--third_party/rust/tokio-0.1.22/examples/udp-client.rs70
-rw-r--r--third_party/rust/tokio-0.1.22/examples/udp-codec.rs65
16 files changed, 2508 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/examples/README.md b/third_party/rust/tokio-0.1.22/examples/README.md
new file mode 100644
index 0000000000..ac9e9b42ff
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/README.md
@@ -0,0 +1,62 @@
+## Examples of how to use Tokio
+
+This directory contains a number of examples showcasing various capabilities of
+the `tokio` crate.
+
+All examples can be executed with:
+
+```
+cargo run --example $name
+```
+
+A high level description of each example is:
+
+* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to
+ all connected clients and then terminates the connection, should help see how
+ to create and initialize `tokio`.
+
+* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts
+ connections and then echos back any contents that are read from each connected
+ client.
+
+* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP
+ listener, accept connections in a loop, and put down in the stdout everything
+ that's read off of each TCP connection.
+
+* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP
+ instead of TCP. This will echo back any packets received to the original
+ sender.
+
+* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to
+ interact with most other examples. The program creates a TCP connection or UDP
+ socket and sends all information read on stdin to the remote peer, displaying
+ any data received on stdout. Often quite useful when interacting with the
+ various other servers here!
+
+* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from
+ any connected client to all other connected clients. You can connect to this
+ in multiple terminals and use it to chat between the terminals.
+
+* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a
+ much more functional programming approach using combinators.
+
+* [`proxy`](proxy.rs) - an example proxy server that will forward all connected
+ TCP clients to the remote address specified when starting the program.
+
+* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP
+ request bodies showcasing running on multiple cores, working with futures and
+ spawning tasks, and finally framing a TCP connection to discrete
+ request/response objects.
+
+* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state
+ between all connected clients, notably the key/value store of this database.
+
+* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example.
+
+* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime.
+
+* [`blocking`](blocking.rs) - perform heavy computation in blocking environment.
+
+If you've got an example you'd like to see here, please feel free to open an
+issue. Otherwise if you've got an example you'd like to add, please feel free
+to make a PR!
diff --git a/third_party/rust/tokio-0.1.22/examples/blocking.rs b/third_party/rust/tokio-0.1.22/examples/blocking.rs
new file mode 100644
index 0000000000..e7d5da6c80
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/blocking.rs
@@ -0,0 +1,87 @@
+//! An example of using blocking funcion annotation.
+//!
+//! This example will create 8 "heavy computation" blocking futures and 8
+//! non-blocking futures with 4 threads core threads in runtime.
+//! Each non-blocking future will print it's id and return immideatly.
+//! Each blocking future will print it's id on start, sleep for 1000 ms, print
+//! it's id and return.
+//!
+//! Note how non-blocking threads are executed before blocking threads finish
+//! their task.
+
+extern crate tokio;
+extern crate tokio_threadpool;
+
+use std::thread;
+use std::time::Duration;
+use tokio::prelude::*;
+use tokio::runtime::Builder;
+use tokio_threadpool::blocking;
+
+/// This future blocks it's poll method for 1000 ms.
+struct BlockingFuture {
+ value: i32,
+}
+
+impl Future for BlockingFuture {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ println!("Blocking begin: {}!", self.value);
+ // Try replacing this part with commnted code
+ blocking(|| {
+ println!("Blocking part annotated: {}!", self.value);
+ thread::sleep(Duration::from_millis(1000));
+ println!("Blocking done annotated: {}!", self.value);
+ })
+ .map_err(|err| panic!("Error in blocing block: {:?}", err))
+ // println!("Blocking part annotated: {}!", self.value);
+ // thread::sleep(Duration::from_millis(1000));
+ // println!("Blocking done annotated: {}!", self.value);
+ // Ok(Async::Ready(()))
+ }
+}
+
+/// This future returns immideatly.
+struct NonBlockingFuture {
+ value: i32,
+}
+
+impl Future for NonBlockingFuture {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ println!("Non-blocking done: {}!", self.value);
+ Ok(Async::Ready(()))
+ }
+}
+
+/// This future spawns child futures.
+struct SpawningFuture;
+
+impl Future for SpawningFuture {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ for i in 0..8 {
+ let blocking_future = BlockingFuture { value: i };
+
+ tokio::spawn(blocking_future);
+ }
+ for i in 0..8 {
+ let non_blocking_future = NonBlockingFuture { value: i };
+ tokio::spawn(non_blocking_future);
+ }
+ Ok(Async::Ready(()))
+ }
+}
+
+fn main() {
+ let spawning_future = SpawningFuture;
+
+ let runtime = Builder::new().core_threads(4).build().unwrap();
+ runtime.block_on_all(spawning_future).unwrap();
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs b/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs
new file mode 100644
index 0000000000..ee147025d2
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs
@@ -0,0 +1,172 @@
+//! A chat server that broadcasts a message to all connections.
+//!
+//! This is a line-based server which accepts connections, reads lines from
+//! those connections, and broadcasts the lines to all other connected clients.
+//!
+//! This example is similar to chat.rs, but uses combinators and a much more
+//! functional style.
+//!
+//! Because we are here running the reactor/executor on the same thread instead
+//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use
+//! Rc + RefCell instead. The max performance is however limited to a CPU HW
+//! thread.
+//!
+//! You can test this out by running:
+//!
+//! cargo run --example chat-combinator-current-thread
+//!
+//! And then in another window run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! You can run the second command in multiple windows and then chat between the
+//! two, seeing the messages from the other client as they're received. For all
+//! connected clients they'll all join the same room and see everyone else's
+//! messages.
+
+#![deny(warnings)]
+
+extern crate futures;
+extern crate tokio;
+
+use tokio::io;
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+use tokio::runtime::current_thread::{Runtime, TaskExecutor};
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::env;
+use std::io::BufReader;
+use std::iter;
+use std::rc::Rc;
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ let mut runtime = Runtime::new().unwrap();
+
+ // Create the TCP listener we'll accept connections on.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse()?;
+
+ let socket = TcpListener::bind(&addr)?;
+ println!("Listening on: {}", addr);
+
+ // This is running on the Tokio current_thread runtime, so it will be single-
+ // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks.
+ let connections = Rc::new(RefCell::new(HashMap::new()));
+
+ // The server task asynchronously iterates over and processes each incoming
+ // connection.
+ let srv = socket
+ .incoming()
+ .map_err(|e| {
+ println!("failed to accept socket; error = {:?}", e);
+ e
+ })
+ .for_each(move |stream| {
+ // The client's socket address
+ let addr = stream.peer_addr()?;
+
+ println!("New Connection: {}", addr);
+
+ // Split the TcpStream into two separate handles. One handle for reading
+ // and one handle for writing. This lets us use separate tasks for
+ // reading and writing.
+ let (reader, writer) = stream.split();
+
+ // Create a channel for our stream, which other sockets will use to
+ // send us messages. Then register our address with the stream to send
+ // data to us.
+ let (tx, rx) = futures::sync::mpsc::unbounded();
+ let mut conns = connections.borrow_mut();
+ conns.insert(addr, tx);
+
+ // Define here what we do for the actual I/O. That is, read a bunch of
+ // lines from the socket and dispatch them while we also write any lines
+ // from other sockets.
+ let connections_inner = connections.clone();
+ let reader = BufReader::new(reader);
+
+ // Model the read portion of this socket by mapping an infinite
+ // iterator to each line off the socket. This "loop" is then
+ // terminated with an error once we hit EOF on the socket.
+ let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
+
+ let socket_reader = iter.fold(reader, move |reader, _| {
+ // Read a line off the socket, failing if we're at EOF
+ let line = io::read_until(reader, b'\n', Vec::new());
+ let line = line.and_then(|(reader, vec)| {
+ if vec.len() == 0 {
+ Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
+ } else {
+ Ok((reader, vec))
+ }
+ });
+
+ // Convert the bytes we read into a string, and then send that
+ // string to all other connected clients.
+ let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
+
+ // Move the connection state into the closure below.
+ let connections = connections_inner.clone();
+
+ line.map(move |(reader, message)| {
+ println!("{}: {:?}", addr, message);
+ let mut conns = connections.borrow_mut();
+
+ if let Ok(msg) = message {
+ // For each open connection except the sender, send the
+ // string via the channel.
+ let iter = conns
+ .iter_mut()
+ .filter(|&(&k, _)| k != addr)
+ .map(|(_, v)| v);
+ for tx in iter {
+ tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
+ }
+ } else {
+ let tx = conns.get_mut(&addr).unwrap();
+ tx.unbounded_send("You didn't send valid UTF-8.".to_string())
+ .unwrap();
+ }
+
+ reader
+ })
+ });
+
+ // Whenever we receive a string on the Receiver, we write it to
+ // `WriteHalf<TcpStream>`.
+ let socket_writer = rx.fold(writer, |writer, msg| {
+ let amt = io::write_all(writer, msg.into_bytes());
+ let amt = amt.map(|(writer, _)| writer);
+ amt.map_err(|_| ())
+ });
+
+ // Now that we've got futures representing each half of the socket, we
+ // use the `select` combinator to wait for either half to be done to
+ // tear down the other. Then we spawn off the result.
+ let connections = connections.clone();
+ let socket_reader = socket_reader.map_err(|_| ());
+ let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
+
+ // Spawn locally a task to process the connection
+ TaskExecutor::current()
+ .spawn_local(Box::new(connection.then(move |_| {
+ let mut conns = connections.borrow_mut();
+ conns.remove(&addr);
+ println!("Connection {} closed.", addr);
+ Ok(())
+ })))
+ .unwrap();
+
+ Ok(())
+ })
+ .map_err(|err| println!("error occurred: {:?}", err));
+
+ // Spawn srv itself
+ runtime.spawn(srv);
+
+ // Execute server
+ runtime.run().unwrap();
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs b/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs
new file mode 100644
index 0000000000..b81e8f7c35
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs
@@ -0,0 +1,156 @@
+//! A chat server that broadcasts a message to all connections.
+//!
+//! This is a line-based server which accepts connections, reads lines from
+//! those connections, and broadcasts the lines to all other connected clients.
+//!
+//! This example is similar to chat.rs, but uses combinators and a much more
+//! functional style.
+//!
+//! You can test this out by running:
+//!
+//! cargo run --example chat
+//!
+//! And then in another window run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! You can run the second command in multiple windows and then chat between the
+//! two, seeing the messages from the other client as they're received. For all
+//! connected clients they'll all join the same room and see everyone else's
+//! messages.
+
+#![deny(warnings)]
+
+extern crate futures;
+extern crate tokio;
+
+use tokio::io;
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+
+use std::collections::HashMap;
+use std::env;
+use std::io::BufReader;
+use std::iter;
+use std::sync::{Arc, Mutex};
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ // Create the TCP listener we'll accept connections on.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse()?;
+
+ let socket = TcpListener::bind(&addr)?;
+ println!("Listening on: {}", addr);
+
+ // This is running on the Tokio runtime, so it will be multi-threaded. The
+ // `Arc<Mutex<...>>` allows state to be shared across the threads.
+ let connections = Arc::new(Mutex::new(HashMap::new()));
+
+ // The server task asynchronously iterates over and processes each incoming
+ // connection.
+ let srv = socket
+ .incoming()
+ .map_err(|e| {
+ println!("failed to accept socket; error = {:?}", e);
+ e
+ })
+ .for_each(move |stream| {
+ // The client's socket address
+ let addr = stream.peer_addr()?;
+
+ println!("New Connection: {}", addr);
+
+ // Split the TcpStream into two separate handles. One handle for reading
+ // and one handle for writing. This lets us use separate tasks for
+ // reading and writing.
+ let (reader, writer) = stream.split();
+
+ // Create a channel for our stream, which other sockets will use to
+ // send us messages. Then register our address with the stream to send
+ // data to us.
+ let (tx, rx) = futures::sync::mpsc::unbounded();
+ connections.lock().unwrap().insert(addr, tx);
+
+ // Define here what we do for the actual I/O. That is, read a bunch of
+ // lines from the socket and dispatch them while we also write any lines
+ // from other sockets.
+ let connections_inner = connections.clone();
+ let reader = BufReader::new(reader);
+
+ // Model the read portion of this socket by mapping an infinite
+ // iterator to each line off the socket. This "loop" is then
+ // terminated with an error once we hit EOF on the socket.
+ let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
+
+ let socket_reader = iter.fold(reader, move |reader, _| {
+ // Read a line off the socket, failing if we're at EOF
+ let line = io::read_until(reader, b'\n', Vec::new());
+ let line = line.and_then(|(reader, vec)| {
+ if vec.len() == 0 {
+ Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
+ } else {
+ Ok((reader, vec))
+ }
+ });
+
+ // Convert the bytes we read into a string, and then send that
+ // string to all other connected clients.
+ let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));
+
+ // Move the connection state into the closure below.
+ let connections = connections_inner.clone();
+
+ line.map(move |(reader, message)| {
+ println!("{}: {:?}", addr, message);
+ let mut conns = connections.lock().unwrap();
+
+ if let Ok(msg) = message {
+ // For each open connection except the sender, send the
+ // string via the channel.
+ let iter = conns
+ .iter_mut()
+ .filter(|&(&k, _)| k != addr)
+ .map(|(_, v)| v);
+ for tx in iter {
+ tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
+ }
+ } else {
+ let tx = conns.get_mut(&addr).unwrap();
+ tx.unbounded_send("You didn't send valid UTF-8.".to_string())
+ .unwrap();
+ }
+
+ reader
+ })
+ });
+
+ // Whenever we receive a string on the Receiver, we write it to
+ // `WriteHalf<TcpStream>`.
+ let socket_writer = rx.fold(writer, |writer, msg| {
+ let amt = io::write_all(writer, msg.into_bytes());
+ let amt = amt.map(|(writer, _)| writer);
+ amt.map_err(|_| ())
+ });
+
+ // Now that we've got futures representing each half of the socket, we
+ // use the `select` combinator to wait for either half to be done to
+ // tear down the other. Then we spawn off the result.
+ let connections = connections.clone();
+ let socket_reader = socket_reader.map_err(|_| ());
+ let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
+
+ // Spawn a task to process the connection
+ tokio::spawn(connection.then(move |_| {
+ connections.lock().unwrap().remove(&addr);
+ println!("Connection {} closed.", addr);
+ Ok(())
+ }));
+
+ Ok(())
+ })
+ .map_err(|err| println!("error occurred: {:?}", err));
+
+ // execute server
+ tokio::run(srv);
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/chat.rs b/third_party/rust/tokio-0.1.22/examples/chat.rs
new file mode 100644
index 0000000000..b21432afa2
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/chat.rs
@@ -0,0 +1,473 @@
+//! A chat server that broadcasts a message to all connections.
+//!
+//! This example is explicitly more verbose than it has to be. This is to
+//! illustrate more concepts.
+//!
+//! A chat server for telnet clients. After a telnet client connects, the first
+//! line should contain the client's name. After that, all lines sent by a
+//! client are broadcasted to all other connected clients.
+//!
+//! Because the client is telnet, lines are delimited by "\r\n".
+//!
+//! You can test this out by running:
+//!
+//! cargo run --example chat
+//!
+//! And then in another terminal run:
+//!
+//! telnet localhost 6142
+//!
+//! You can run the `telnet` command in any number of additional windows.
+//!
+//! You can run the second command in multiple windows and then chat between the
+//! two, seeing the messages from the other client as they're received. For all
+//! connected clients they'll all join the same room and see everyone else's
+//! messages.
+
+#![deny(warnings)]
+
+extern crate tokio;
+#[macro_use]
+extern crate futures;
+extern crate bytes;
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::future::{self, Either};
+use futures::sync::mpsc;
+use tokio::io;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::{Arc, Mutex};
+
+/// Shorthand for the transmit half of the message channel.
+type Tx = mpsc::UnboundedSender<Bytes>;
+
+/// Shorthand for the receive half of the message channel.
+type Rx = mpsc::UnboundedReceiver<Bytes>;
+
+/// Data that is shared between all peers in the chat server.
+///
+/// This is the set of `Tx` handles for all connected clients. Whenever a
+/// message is received from a client, it is broadcasted to all peers by
+/// iterating over the `peers` entries and sending a copy of the message on each
+/// `Tx`.
+struct Shared {
+ peers: HashMap<SocketAddr, Tx>,
+}
+
+/// The state for each connected client.
+struct Peer {
+ /// Name of the peer.
+ ///
+ /// When a client connects, the first line sent is treated as the client's
+ /// name (like alice or bob). The name is used to preface all messages that
+ /// arrive from the client so that we can simulate a real chat server:
+ ///
+ /// ```text
+ /// alice: Hello everyone.
+ /// bob: Welcome to telnet chat!
+ /// ```
+ name: BytesMut,
+
+ /// The TCP socket wrapped with the `Lines` codec, defined below.
+ ///
+ /// This handles sending and receiving data on the socket. When using
+ /// `Lines`, we can work at the line level instead of having to manage the
+ /// raw byte operations.
+ lines: Lines,
+
+ /// Handle to the shared chat state.
+ ///
+ /// This is used to broadcast messages read off the socket to all connected
+ /// peers.
+ state: Arc<Mutex<Shared>>,
+
+ /// Receive half of the message channel.
+ ///
+ /// This is used to receive messages from peers. When a message is received
+ /// off of this `Rx`, it will be written to the socket.
+ rx: Rx,
+
+ /// Client socket address.
+ ///
+ /// The socket address is used as the key in the `peers` HashMap. The
+ /// address is saved so that the `Peer` drop implementation can clean up its
+ /// entry.
+ addr: SocketAddr,
+}
+
+/// Line based codec
+///
+/// This decorates a socket and presents a line based read / write interface.
+///
+/// As a user of `Lines`, we can focus on working at the line level. So, we send
+/// and receive values that represent entire lines. The `Lines` codec will
+/// handle the encoding and decoding as well as reading from and writing to the
+/// socket.
+#[derive(Debug)]
+struct Lines {
+ /// The TCP socket.
+ socket: TcpStream,
+
+ /// Buffer used when reading from the socket. Data is not returned from this
+ /// buffer until an entire line has been read.
+ rd: BytesMut,
+
+ /// Buffer used to stage data before writing it to the socket.
+ wr: BytesMut,
+}
+
+impl Shared {
+ /// Create a new, empty, instance of `Shared`.
+ fn new() -> Self {
+ Shared {
+ peers: HashMap::new(),
+ }
+ }
+}
+
+impl Peer {
+ /// Create a new instance of `Peer`.
+ fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer {
+ // Get the client socket address
+ let addr = lines.socket.peer_addr().unwrap();
+
+ // Create a channel for this peer
+ let (tx, rx) = mpsc::unbounded();
+
+ // Add an entry for this `Peer` in the shared state map.
+ state.lock().unwrap().peers.insert(addr, tx);
+
+ Peer {
+ name,
+ lines,
+ state,
+ rx,
+ addr,
+ }
+ }
+}
+
+/// This is where a connected client is managed.
+///
+/// A `Peer` is also a future representing completely processing the client.
+///
+/// When a `Peer` is created, the first line (representing the client's name)
+/// has already been read. When the socket closes, the `Peer` future completes.
+///
+/// While processing, the peer future implementation will:
+///
+/// 1) Receive messages on its message channel and write them to the socket.
+/// 2) Receive messages from the socket and broadcast them to all peers.
+///
+impl Future for Peer {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ // Tokio (and futures) use cooperative scheduling without any
+ // preemption. If a task never yields execution back to the executor,
+ // then other tasks may be starved.
+ //
+ // To deal with this, robust applications should not have any unbounded
+ // loops. In this example, we will read at most `LINES_PER_TICK` lines
+ // from the client on each tick.
+ //
+ // If the limit is hit, the current task is notified, informing the
+ // executor to schedule the task again asap.
+ const LINES_PER_TICK: usize = 10;
+
+ // Receive all messages from peers.
+ for i in 0..LINES_PER_TICK {
+ // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
+ // safe.
+ match self.rx.poll().unwrap() {
+ Async::Ready(Some(v)) => {
+ // Buffer the line. Once all lines are buffered, they will
+ // be flushed to the socket (right below).
+ self.lines.buffer(&v);
+
+ // If this is the last iteration, the loop will break even
+ // though there could still be lines to read. Because we did
+ // not reach `Async::NotReady`, we have to notify ourselves
+ // in order to tell the executor to schedule the task again.
+ if i + 1 == LINES_PER_TICK {
+ task::current().notify();
+ }
+ }
+ _ => break,
+ }
+ }
+
+ // Flush the write buffer to the socket
+ let _ = self.lines.poll_flush()?;
+
+ // Read new lines from the socket
+ while let Async::Ready(line) = self.lines.poll()? {
+ println!("Received line ({:?}) : {:?}", self.name, line);
+
+ if let Some(message) = line {
+ // Append the peer's name to the front of the line:
+ let mut line = self.name.clone();
+ line.extend_from_slice(b": ");
+ line.extend_from_slice(&message);
+ line.extend_from_slice(b"\r\n");
+
+ // We're using `Bytes`, which allows zero-copy clones (by
+ // storing the data in an Arc internally).
+ //
+ // However, before cloning, we must freeze the data. This
+ // converts it from mutable -> immutable, allowing zero copy
+ // cloning.
+ let line = line.freeze();
+
+ // Now, send the line to all other peers
+ for (addr, tx) in &self.state.lock().unwrap().peers {
+ // Don't send the message to ourselves
+ if *addr != self.addr {
+ // The send only fails if the rx half has been dropped,
+ // however this is impossible as the `tx` half will be
+ // removed from the map before the `rx` is dropped.
+ tx.unbounded_send(line.clone()).unwrap();
+ }
+ }
+ } else {
+ // EOF was reached. The remote client has disconnected. There is
+ // nothing more to do.
+ return Ok(Async::Ready(()));
+ }
+ }
+
+ // As always, it is important to not just return `NotReady` without
+ // ensuring an inner future also returned `NotReady`.
+ //
+ // We know we got a `NotReady` from either `self.rx` or `self.lines`, so
+ // the contract is respected.
+ Ok(Async::NotReady)
+ }
+}
+
+impl Drop for Peer {
+ fn drop(&mut self) {
+ self.state.lock().unwrap().peers.remove(&self.addr);
+ }
+}
+
+impl Lines {
+ /// Create a new `Lines` codec backed by the socket
+ fn new(socket: TcpStream) -> Self {
+ Lines {
+ socket,
+ rd: BytesMut::new(),
+ wr: BytesMut::new(),
+ }
+ }
+
+ /// Buffer a line.
+ ///
+ /// This writes the line to an internal buffer. Calls to `poll_flush` will
+ /// attempt to flush this buffer to the socket.
+ fn buffer(&mut self, line: &[u8]) {
+ // Ensure the buffer has capacity. Ideally this would not be unbounded,
+ // but to keep the example simple, we will not limit this.
+ self.wr.reserve(line.len());
+
+ // Push the line onto the end of the write buffer.
+ //
+ // The `put` function is from the `BufMut` trait.
+ self.wr.put(line);
+ }
+
+ /// Flush the write buffer to the socket
+ fn poll_flush(&mut self) -> Poll<(), io::Error> {
+ // As long as there is buffered data to write, try to write it.
+ while !self.wr.is_empty() {
+ // Try to write some bytes to the socket
+ let n = try_ready!(self.socket.poll_write(&self.wr));
+
+ // As long as the wr is not empty, a successful write should
+ // never write 0 bytes.
+ assert!(n > 0);
+
+ // This discards the first `n` bytes of the buffer.
+ let _ = self.wr.split_to(n);
+ }
+
+ Ok(Async::Ready(()))
+ }
+
+ /// Read data from the socket.
+ ///
+ /// This only returns `Ready` when the socket has closed.
+ fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
+ loop {
+ // Ensure the read buffer has capacity.
+ //
+ // This might result in an internal allocation.
+ self.rd.reserve(1024);
+
+ // Read data into the buffer.
+ let n = try_ready!(self.socket.read_buf(&mut self.rd));
+
+ if n == 0 {
+ return Ok(Async::Ready(()));
+ }
+ }
+ }
+}
+
+impl Stream for Lines {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // First, read any new data that might have been received off the socket
+ let sock_closed = self.fill_read_buf()?.is_ready();
+
+ // Now, try finding lines
+ let pos = self
+ .rd
+ .windows(2)
+ .enumerate()
+ .find(|&(_, bytes)| bytes == b"\r\n")
+ .map(|(i, _)| i);
+
+ if let Some(pos) = pos {
+ // Remove the line from the read buffer and set it to `line`.
+ let mut line = self.rd.split_to(pos + 2);
+
+ // Drop the trailing \r\n
+ line.split_off(pos);
+
+ // Return the line
+ return Ok(Async::Ready(Some(line)));
+ }
+
+ if sock_closed {
+ Ok(Async::Ready(None))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+/// Spawn a task to manage the socket.
+///
+/// This will read the first line from the socket to identify the client, then
+/// add the client to the set of connected peers in the chat service.
+fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
+ // Wrap the socket with the `Lines` codec that we wrote above.
+ //
+ // By doing this, we can operate at the line level instead of doing raw byte
+ // manipulation.
+ let lines = Lines::new(socket);
+
+ // The first line is treated as the client's name. The client is not added
+ // to the set of connected peers until this line is received.
+ //
+ // We use the `into_future` combinator to extract the first item from the
+ // lines stream. `into_future` takes a `Stream` and converts it to a future
+ // of `(first, rest)` where `rest` is the original stream instance.
+ let connection = lines
+ .into_future()
+ // `into_future` doesn't have the right error type, so map the error to
+ // make it work.
+ .map_err(|(e, _)| e)
+ // Process the first received line as the client's name.
+ .and_then(|(name, lines)| {
+ // If `name` is `None`, then the client disconnected without
+ // actually sending a line of data.
+ //
+ // Since the connection is closed, there is no further work that we
+ // need to do. So, we just terminate processing by returning
+ // `future::ok()`.
+ //
+ // The problem is that only a single future type can be returned
+ // from a combinator closure, but we want to return both
+ // `future::ok()` and `Peer` (below).
+ //
+ // This is a common problem, so the `futures` crate solves this by
+ // providing the `Either` helper enum that allows creating a single
+ // return type that covers two concrete future types.
+ let name = match name {
+ Some(name) => name,
+ None => {
+ // The remote client closed the connection without sending
+ // any data.
+ return Either::A(future::ok(()));
+ }
+ };
+
+ println!("`{:?}` is joining the chat", name);
+
+ // Create the peer.
+ //
+ // This is also a future that processes the connection, only
+ // completing when the socket closes.
+ let peer = Peer::new(name, state, lines);
+
+ // Wrap `peer` with `Either::B` to make the return type fit.
+ Either::B(peer)
+ })
+ // Task futures have an error of type `()`, this ensures we handle the
+ // error. We do this by printing the error to STDOUT.
+ .map_err(|e| {
+ println!("connection error = {:?}", e);
+ });
+
+ // Spawn the task. Internally, this submits the task to a thread pool.
+ tokio::spawn(connection);
+}
+
+pub fn main() -> Result<(), Box<std::error::Error>> {
+ // Create the shared state. This is how all the peers communicate.
+ //
+ // The server task will hold a handle to this. For every new client, the
+ // `state` handle is cloned and passed into the task that processes the
+ // client connection.
+ let state = Arc::new(Mutex::new(Shared::new()));
+
+ let addr = "127.0.0.1:6142".parse()?;
+
+ // Bind a TCP listener to the socket address.
+ //
+ // Note that this is the Tokio TcpListener, which is fully async.
+ let listener = TcpListener::bind(&addr)?;
+
+ // The server task asynchronously iterates over and processes each
+ // incoming connection.
+ let server = listener
+ .incoming()
+ .for_each(move |socket| {
+ // Spawn a task to process the connection
+ process(socket, state.clone());
+ Ok(())
+ })
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("accept error = {:?}", err);
+ });
+
+ println!("server running on localhost:6142");
+
+ // Start the Tokio runtime.
+ //
+ // The Tokio is a pre-configured "out of the box" runtime for building
+ // asynchronous applications. It includes both a reactor and a task
+ // scheduler. This means applications are multithreaded by default.
+ //
+ // This function blocks until the runtime reaches an idle state. Idle is
+ // defined as all spawned tasks have completed and all I/O resources (TCP
+ // sockets in our case) have been dropped.
+ //
+ // In our example, we have not defined a shutdown strategy, so this will
+ // block until `ctrl-c` is pressed at the terminal.
+ tokio::run(server);
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/connect.rs b/third_party/rust/tokio-0.1.22/examples/connect.rs
new file mode 100644
index 0000000000..4dc0ea31e2
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/connect.rs
@@ -0,0 +1,257 @@
+//! An example of hooking up stdin/stdout to either a TCP or UDP stream.
+//!
+//! This example will connect to a socket address specified in the argument list
+//! and then forward all data read on stdin to the server, printing out all data
+//! received on stdout. An optional `--udp` argument can be passed to specify
+//! that the connection should be made over UDP instead of TCP, translating each
+//! line entered on stdin to a UDP packet to be sent to the remote address.
+//!
+//! Note that this is not currently optimized for performance, especially
+//! around buffer management. Rather it's intended to show an example of
+//! working with a client.
+//!
+//! This example can be quite useful when interacting with the other examples in
+//! this repository! Many of them recommend running this as a simple "hook up
+//! stdin/stdout to a server" to get up and running.
+
+#![deny(warnings)]
+
+extern crate bytes;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_io;
+
+use std::env;
+use std::io::{self, Read, Write};
+use std::net::SocketAddr;
+use std::thread;
+
+use futures::sync::mpsc;
+use tokio::prelude::*;
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ // Determine if we're going to run in TCP or UDP mode
+ let mut args = env::args().skip(1).collect::<Vec<_>>();
+ let tcp = match args.iter().position(|a| a == "--udp") {
+ Some(i) => {
+ args.remove(i);
+ false
+ }
+ None => true,
+ };
+
+ // Parse what address we're going to connect to
+ let addr = match args.first() {
+ Some(addr) => addr,
+ None => Err("this program requires at least one argument")?,
+ };
+ let addr = addr.parse::<SocketAddr>()?;
+
+ // Right now Tokio doesn't support a handle to stdin running on the event
+ // loop, so we farm out that work to a separate thread. This thread will
+ // read data (with blocking I/O) from stdin and then send it to the event
+ // loop over a standard futures channel.
+ let (stdin_tx, stdin_rx) = mpsc::channel(0);
+ thread::spawn(|| read_stdin(stdin_tx));
+ let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx"));
+
+ // Now that we've got our stdin read we either set up our TCP connection or
+ // our UDP connection to get a stream of bytes we're going to emit to
+ // stdout.
+ let stdout = if tcp {
+ tcp::connect(&addr, Box::new(stdin_rx))?
+ } else {
+ udp::connect(&addr, Box::new(stdin_rx))?
+ };
+
+ // And now with our stream of bytes to write to stdout, we execute that in
+ // the event loop! Note that this is doing blocking I/O to emit data to
+ // stdout, and in general it's a no-no to do that sort of work on the event
+ // loop. In this case, though, we know it's ok as the event loop isn't
+ // otherwise running anything useful.
+ let mut out = io::stdout();
+
+ tokio::run({
+ stdout
+ .for_each(move |chunk| out.write_all(&chunk))
+ .map_err(|e| println!("error reading stdout; error = {:?}", e))
+ });
+ Ok(())
+}
+
+mod codec {
+ use bytes::{BufMut, BytesMut};
+ use std::io;
+ use tokio::codec::{Decoder, Encoder};
+
+ /// A simple `Codec` implementation that just ships bytes around.
+ ///
+ /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
+ /// just a convenient method for us to work with streams/sinks for now.
+ /// This'll just take any data read and interpret it as a "frame" and
+ /// conversely just shove data into the output location without looking at
+ /// it.
+ pub struct Bytes;
+
+ impl Decoder for Bytes {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+
+ impl Encoder for Bytes {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
+ buf.put(&data[..]);
+ Ok(())
+ }
+ }
+}
+
+mod tcp {
+ use tokio;
+ use tokio::codec::Decoder;
+ use tokio::net::TcpStream;
+ use tokio::prelude::*;
+
+ use bytes::BytesMut;
+ use codec::Bytes;
+
+ use std::error::Error;
+ use std::io;
+ use std::net::SocketAddr;
+
+ pub fn connect(
+ addr: &SocketAddr,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
+ ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
+ let tcp = TcpStream::connect(addr);
+
+ // After the TCP connection has been established, we set up our client
+ // to start forwarding data.
+ //
+ // First we use the `Io::framed` method with a simple implementation of
+ // a `Codec` (listed below) that just ships bytes around. We then split
+ // that in two to work with the stream and sink separately.
+ //
+ // Half of the work we're going to do is to take all data we receive on
+ // `stdin` and send that along the TCP stream (`sink`). The second half
+ // is to take all the data we receive (`stream`) and then write that to
+ // stdout. We'll be passing this handle back out from this method.
+ //
+ // You'll also note that we *spawn* the work to read stdin and write it
+ // to the TCP stream. This is done to ensure that happens concurrently
+ // with us reading data from the stream.
+ let stream = Box::new(
+ tcp.map(move |stream| {
+ let (sink, stream) = Bytes.framed(stream).split();
+
+ tokio::spawn(stdin.forward(sink).then(|result| {
+ if let Err(e) = result {
+ println!("failed to write to socket: {}", e)
+ }
+ Ok(())
+ }));
+
+ stream
+ })
+ .flatten_stream(),
+ );
+ Ok(stream)
+ }
+}
+
+mod udp {
+ use std::error::Error;
+ use std::io;
+ use std::net::SocketAddr;
+
+ use bytes::BytesMut;
+ use tokio;
+ use tokio::net::{UdpFramed, UdpSocket};
+ use tokio::prelude::*;
+
+ use codec::Bytes;
+
+ pub fn connect(
+ &addr: &SocketAddr,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
+ ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
+ // We'll bind our UDP socket to a local IP/port, but for now we
+ // basically let the OS pick both of those.
+ let addr_to_bind = if addr.ip().is_ipv4() {
+ "0.0.0.0:0".parse()?
+ } else {
+ "[::]:0".parse()?
+ };
+ let udp = match UdpSocket::bind(&addr_to_bind) {
+ Ok(udp) => udp,
+ Err(_) => Err("failed to bind socket")?,
+ };
+
+ // Like above with TCP we use an instance of `Bytes` codec to transform
+ // this UDP socket into a framed sink/stream which operates over
+ // discrete values. In this case we're working with *pairs* of socket
+ // addresses and byte buffers.
+ let (sink, stream) = UdpFramed::new(udp, Bytes).split();
+
+ // All bytes from `stdin` will go to the `addr` specified in our
+ // argument list. Like with TCP this is spawned concurrently
+ let forward_stdin = stdin
+ .map(move |chunk| (chunk, addr))
+ .forward(sink)
+ .then(|result| {
+ if let Err(e) = result {
+ println!("failed to write to socket: {}", e)
+ }
+ Ok(())
+ });
+
+ // With UDP we could receive data from any source, so filter out
+ // anything coming from a different address
+ let receive = stream.filter_map(move |(chunk, src)| {
+ if src == addr {
+ Some(chunk.into())
+ } else {
+ None
+ }
+ });
+
+ let stream = Box::new(
+ future::lazy(|| {
+ tokio::spawn(forward_stdin);
+ future::ok(receive)
+ })
+ .flatten_stream(),
+ );
+ Ok(stream)
+ }
+}
+
+// Our helper method which will read data from stdin and send it along the
+// sender provided.
+fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
+ let mut stdin = io::stdin();
+ loop {
+ let mut buf = vec![0; 1024];
+ let n = match stdin.read(&mut buf) {
+ Err(_) | Ok(0) => break,
+ Ok(n) => n,
+ };
+ buf.truncate(n);
+ tx = match tx.send(buf).wait() {
+ Ok(tx) => tx,
+ Err(_) => break,
+ };
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/echo-udp.rs b/third_party/rust/tokio-0.1.22/examples/echo-udp.rs
new file mode 100644
index 0000000000..93ebca799d
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/echo-udp.rs
@@ -0,0 +1,74 @@
+//! An UDP echo server that just sends back everything that it receives.
+//!
+//! If you're on Unix you can test this out by in one terminal executing:
+//!
+//! cargo run --example echo-udp
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect -- --udp 127.0.0.1:8080
+//!
+//! Each line you type in to the `nc` terminal should be echo'd back to you!
+
+#![deny(warnings)]
+
+#[macro_use]
+extern crate futures;
+extern crate tokio;
+
+use std::net::SocketAddr;
+use std::{env, io};
+
+use tokio::net::UdpSocket;
+use tokio::prelude::*;
+
+struct Server {
+ socket: UdpSocket,
+ buf: Vec<u8>,
+ to_send: Option<(usize, SocketAddr)>,
+}
+
+impl Future for Server {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ loop {
+ // First we check to see if there's a message we need to echo back.
+ // If so then we try to send it back to the original source, waiting
+ // until it's writable and we're able to do so.
+ if let Some((size, peer)) = self.to_send {
+ let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
+ println!("Echoed {}/{} bytes to {}", amt, size, peer);
+ self.to_send = None;
+ }
+
+ // If we're here then `to_send` is `None`, so we take a look for the
+ // next message we're going to echo back.
+ self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
+ }
+ }
+}
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>()?;
+
+ let socket = UdpSocket::bind(&addr)?;
+ println!("Listening on: {}", socket.local_addr()?);
+
+ let server = Server {
+ socket: socket,
+ buf: vec![0; 1024],
+ to_send: None,
+ };
+
+ // This starts the server task.
+ //
+ // `map_err` handles the error by logging it and maps the future to a type
+ // that can be spawned.
+ //
+ // `tokio::run` spawns the task on the Tokio runtime and starts running.
+ tokio::run(server.map_err(|e| println!("server error = {:?}", e)));
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/echo.rs b/third_party/rust/tokio-0.1.22/examples/echo.rs
new file mode 100644
index 0000000000..45f808f89d
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/echo.rs
@@ -0,0 +1,115 @@
+//! A "hello world" echo server with Tokio
+//!
+//! This server will create a TCP listener, accept connections in a loop, and
+//! write back everything that's read off of each TCP connection.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! To see this server in action, you can run this in one terminal:
+//!
+//! cargo run --example echo
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! Each line you type in to the `connect` terminal should be echo'd back to
+//! you! If you open up multiple terminals running the `connect` example you
+//! should be able to see them all make progress simultaneously.
+
+#![deny(warnings)]
+
+extern crate tokio;
+
+use tokio::io;
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+
+use std::env;
+use std::net::SocketAddr;
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ // Allow passing an address to listen on as the first argument of this
+ // program, but otherwise we'll just set up our TCP listener on
+ // 127.0.0.1:8080 for connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>()?;
+
+ // Next up we create a TCP listener which will listen for incoming
+ // connections. This TCP listener is bound to the address we determined
+ // above and must be associated with an event loop, so we pass in a handle
+ // to our event loop. After the socket's created we inform that we're ready
+ // to go and start accepting connections.
+ let socket = TcpListener::bind(&addr)?;
+ println!("Listening on: {}", addr);
+
+ // Here we convert the `TcpListener` to a stream of incoming connections
+ // with the `incoming` method. We then define how to process each element in
+ // the stream with the `for_each` method.
+ //
+ // This combinator, defined on the `Stream` trait, will allow us to define a
+ // computation to happen for all items on the stream (in this case TCP
+ // connections made to the server). The return value of the `for_each`
+ // method is itself a future representing processing the entire stream of
+ // connections, and ends up being our server.
+ let done = socket
+ .incoming()
+ .map_err(|e| println!("failed to accept socket; error = {:?}", e))
+ .for_each(move |socket| {
+ // Once we're inside this closure this represents an accepted client
+ // from our server. The `socket` is the client connection (similar to
+ // how the standard library operates).
+ //
+ // We just want to copy all data read from the socket back onto the
+ // socket itself (e.g. "echo"). We can use the standard `io::copy`
+ // combinator in the `tokio-core` crate to do precisely this!
+ //
+ // The `copy` function takes two arguments, where to read from and where
+ // to write to. We only have one argument, though, with `socket`.
+ // Luckily there's a method, `Io::split`, which will split an Read/Write
+ // stream into its two halves. This operation allows us to work with
+ // each stream independently, such as pass them as two arguments to the
+ // `copy` function.
+ //
+ // The `copy` function then returns a future, and this future will be
+ // resolved when the copying operation is complete, resolving to the
+ // amount of data that was copied.
+ let (reader, writer) = socket.split();
+ let amt = io::copy(reader, writer);
+
+ // After our copy operation is complete we just print out some helpful
+ // information.
+ let msg = amt.then(move |result| {
+ match result {
+ Ok((amt, _, _)) => println!("wrote {} bytes", amt),
+ Err(e) => println!("error: {}", e),
+ }
+
+ Ok(())
+ });
+
+ // And this is where much of the magic of this server happens. We
+ // crucially want all clients to make progress concurrently, rather than
+ // blocking one on completion of another. To achieve this we use the
+ // `tokio::spawn` function to execute the work in the background.
+ //
+ // This function will transfer ownership of the future (`msg` in this
+ // case) to the Tokio runtime thread pool that. The thread pool will
+ // drive the future to completion.
+ //
+ // Essentially here we're executing a new task to run concurrently,
+ // which will allow all of our clients to be processed concurrently.
+ tokio::spawn(msg)
+ });
+
+ // And finally now that we've define what our server is, we run it!
+ //
+ // This starts the Tokio runtime, spawns the server task, and blocks the
+ // current thread until all tasks complete execution. Since the `done` task
+ // never completes (it just keeps accepting sockets), `tokio::run` blocks
+ // forever (until ctrl-c is pressed).
+ tokio::run(done);
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/hello_world.rs b/third_party/rust/tokio-0.1.22/examples/hello_world.rs
new file mode 100644
index 0000000000..c82762691a
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/hello_world.rs
@@ -0,0 +1,58 @@
+//! Hello world server.
+//!
+//! A simple client that opens a TCP stream, writes "hello world\n", and closes
+//! the connection.
+//!
+//! You can test this out by running:
+//!
+//! ncat -l 6142
+//!
+//! And then in another terminal run:
+//!
+//! cargo run --example hello_world
+
+#![deny(warnings)]
+
+extern crate tokio;
+
+use tokio::io;
+use tokio::net::TcpStream;
+use tokio::prelude::*;
+
+pub fn main() -> Result<(), Box<std::error::Error>> {
+ let addr = "127.0.0.1:6142".parse()?;
+
+ // Open a TCP stream to the socket address.
+ //
+ // Note that this is the Tokio TcpStream, which is fully async.
+ let client = TcpStream::connect(&addr)
+ .and_then(|stream| {
+ println!("created stream");
+ io::write_all(stream, "hello world\n").then(|result| {
+ println!("wrote to stream; success={:?}", result.is_ok());
+ Ok(())
+ })
+ })
+ .map_err(|err| {
+ // All tasks must have an `Error` type of `()`. This forces error
+ // handling and helps avoid silencing failures.
+ //
+ // In our example, we are only going to log the error to STDOUT.
+ println!("connection error = {:?}", err);
+ });
+
+ // Start the Tokio runtime.
+ //
+ // The Tokio is a pre-configured "out of the box" runtime for building
+ // asynchronous applications. It includes both a reactor and a task
+ // scheduler. This means applications are multithreaded by default.
+ //
+ // This function blocks until the runtime reaches an idle state. Idle is
+ // defined as all spawned tasks have completed and all I/O resources (TCP
+ // sockets in our case) have been dropped.
+ println!("About to create the stream and write to it...");
+ tokio::run(client);
+ println!("Stream has been created and written to.");
+
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs b/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs
new file mode 100644
index 0000000000..8e3e129965
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs
@@ -0,0 +1,87 @@
+//! An example how to manually assemble a runtime and run some tasks on it.
+//!
+//! This is closer to the single-threaded runtime than the default tokio one, as it is simpler to
+//! grasp. There are conceptually similar, but the multi-threaded one would be more code. If you
+//! just want to *use* a single-threaded runtime, use the one provided by tokio directly
+//! (`tokio::runtime::current_thread::Runtime::new()`. This is a demonstration only.
+//!
+//! Note that the error handling is a bit left out. Also, the `run` could be modified to return the
+//! result of the provided future.
+
+extern crate futures;
+extern crate tokio;
+extern crate tokio_current_thread;
+extern crate tokio_executor;
+extern crate tokio_reactor;
+extern crate tokio_timer;
+
+use std::io::Error as IoError;
+use std::time::{Duration, Instant};
+
+use futures::{future, Future};
+use tokio_current_thread::CurrentThread;
+use tokio_reactor::Reactor;
+use tokio_timer::timer::{self, Timer};
+
+/// Creates a "runtime".
+///
+/// This is similar to running `tokio::runtime::current_thread::Runtime::new()`.
+fn run<F: Future<Item = (), Error = ()>>(f: F) -> Result<(), IoError> {
+ // We need a reactor to receive events about IO objects from kernel
+ let reactor = Reactor::new()?;
+ let reactor_handle = reactor.handle();
+ // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
+ // reactor pick up some new external events.
+ let timer = Timer::new(reactor);
+ let timer_handle = timer.handle();
+ // And now put a single-threaded executor on top of the timer. When there are no futures ready
+ // to do something, it'll let the timer or the reactor generate some new stimuli for the
+ // futures to continue in their life.
+ let mut executor = CurrentThread::new_with_park(timer);
+ // Binds an executor to this thread
+ let mut enter = tokio_executor::enter().expect("Multiple executors at once");
+ // This will set the default handle and timer to use inside the closure and run the future.
+ tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| {
+ timer::with_default(&timer_handle, enter, |enter| {
+ // The TaskExecutor is a fake executor that looks into the current single-threaded
+ // executor when used. This is a trick, because we need two mutable references to the
+ // executor (one to run the provided future, another to install as the default one). We
+ // use the fake one here as the default one.
+ let mut default_executor = tokio_current_thread::TaskExecutor::current();
+ tokio_executor::with_default(&mut default_executor, enter, |enter| {
+ let mut executor = executor.enter(enter);
+ // Run the provided future
+ executor.block_on(f).unwrap();
+ // Run all the other futures that are still left in the executor
+ executor.run().unwrap();
+ });
+ });
+ });
+ Ok(())
+}
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ run(future::lazy(|| {
+ // Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn().
+ // It also can use the default reactor and create timeouts.
+
+ // Connect somewhere. And then do nothing with it. Yes, useless.
+ //
+ // This will use the default reactor which runs in the current thread.
+ let connect = tokio::net::TcpStream::connect(&"127.0.0.1:53".parse().unwrap())
+ .map(|_| println!("Connected"))
+ .map_err(|e| println!("Failed to connect: {}", e));
+ // We can spawn it without requiring Send. This would panic if we run it outside of the
+ // `run` (or outside of anything else)
+ tokio_current_thread::spawn(connect);
+
+ // We can also create timeouts.
+ let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5))
+ .map(|()| println!("5 seconds are over"))
+ .map_err(|e| println!("Failed to wait: {}", e));
+ // We can spawn on the default executor, which is also the local one.
+ tokio::executor::spawn(deadline);
+ Ok(())
+ }))?;
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs b/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs
new file mode 100644
index 0000000000..94a606483c
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs
@@ -0,0 +1,150 @@
+//! A "print-each-packet" server with Tokio
+//!
+//! This server will create a TCP listener, accept connections in a loop, and
+//! put down in the stdout everything that's read off of each TCP connection.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! To see this server in action, you can run this in one terminal:
+//!
+//! cargo run --example print\_each\_packet
+//!
+//! and in another terminal you can run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! Each line you type in to the `connect` terminal should be written to terminal!
+//!
+//! Minimal js example:
+//!
+//! ```js
+//! var net = require("net");
+//!
+//! var listenPort = 8080;
+//!
+//! var server = net.createServer(function (socket) {
+//! socket.on("data", function (bytes) {
+//! console.log("bytes", bytes);
+//! });
+//!
+//! socket.on("end", function() {
+//! console.log("Socket received FIN packet and closed connection");
+//! });
+//! socket.on("error", function (error) {
+//! console.log("Socket closed with error", error);
+//! });
+//!
+//! socket.on("close", function (with_error) {
+//! if (with_error) {
+//! console.log("Socket closed with result: Err(SomeError)");
+//! } else {
+//! console.log("Socket closed with result: Ok(())");
+//! }
+//! });
+//!
+//! });
+//!
+//! server.listen(listenPort);
+//!
+//! console.log("Listening on:", listenPort);
+//! ```
+//!
+
+#![deny(warnings)]
+
+extern crate tokio;
+extern crate tokio_codec;
+
+use tokio::codec::Decoder;
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+use tokio_codec::BytesCodec;
+
+use std::env;
+use std::net::SocketAddr;
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ // Allow passing an address to listen on as the first argument of this
+ // program, but otherwise we'll just set up our TCP listener on
+ // 127.0.0.1:8080 for connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>()?;
+
+ // Next up we create a TCP listener which will listen for incoming
+ // connections. This TCP listener is bound to the address we determined
+ // above and must be associated with an event loop, so we pass in a handle
+ // to our event loop. After the socket's created we inform that we're ready
+ // to go and start accepting connections.
+ let socket = TcpListener::bind(&addr)?;
+ println!("Listening on: {}", addr);
+
+ // Here we convert the `TcpListener` to a stream of incoming connections
+ // with the `incoming` method. We then define how to process each element in
+ // the stream with the `for_each` method.
+ //
+ // This combinator, defined on the `Stream` trait, will allow us to define a
+ // computation to happen for all items on the stream (in this case TCP
+ // connections made to the server). The return value of the `for_each`
+ // method is itself a future representing processing the entire stream of
+ // connections, and ends up being our server.
+ let done = socket
+ .incoming()
+ .map_err(|e| println!("failed to accept socket; error = {:?}", e))
+ .for_each(move |socket| {
+ // Once we're inside this closure this represents an accepted client
+ // from our server. The `socket` is the client connection (similar to
+ // how the standard library operates).
+ //
+ // We're parsing each socket with the `BytesCodec` included in `tokio_io`,
+ // and then we `split` each codec into the reader/writer halves.
+ //
+ // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html
+ let framed = BytesCodec::new().framed(socket);
+ let (_writer, reader) = framed.split();
+
+ let processor = reader
+ .for_each(|bytes| {
+ println!("bytes: {:?}", bytes);
+ Ok(())
+ })
+ // After our copy operation is complete we just print out some helpful
+ // information.
+ .and_then(|()| {
+ println!("Socket received FIN packet and closed connection");
+ Ok(())
+ })
+ .or_else(|err| {
+ println!("Socket closed with error: {:?}", err);
+ // We have to return the error to catch it in the next ``.then` call
+ Err(err)
+ })
+ .then(|result| {
+ println!("Socket closed with result: {:?}", result);
+ Ok(())
+ });
+
+ // And this is where much of the magic of this server happens. We
+ // crucially want all clients to make progress concurrently, rather than
+ // blocking one on completion of another. To achieve this we use the
+ // `tokio::spawn` function to execute the work in the background.
+ //
+ // This function will transfer ownership of the future (`msg` in this
+ // case) to the Tokio runtime thread pool that. The thread pool will
+ // drive the future to completion.
+ //
+ // Essentially here we're executing a new task to run concurrently,
+ // which will allow all of our clients to be processed concurrently.
+ tokio::spawn(processor)
+ });
+
+ // And finally now that we've define what our server is, we run it!
+ //
+ // This starts the Tokio runtime, spawns the server task, and blocks the
+ // current thread until all tasks complete execution. Since the `done` task
+ // never completes (it just keeps accepting sockets), `tokio::run` blocks
+ // forever (until ctrl-c is pressed).
+ tokio::run(done);
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/proxy.rs b/third_party/rust/tokio-0.1.22/examples/proxy.rs
new file mode 100644
index 0000000000..2cbcf119a2
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/proxy.rs
@@ -0,0 +1,130 @@
+//! A proxy that forwards data to another server and forwards that server's
+//! responses back to clients.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! You can showcase this by running this in one terminal:
+//!
+//! cargo run --example proxy
+//!
+//! This in another terminal
+//!
+//! cargo run --example echo
+//!
+//! And finally this in another terminal
+//!
+//! cargo run --example connect 127.0.0.1:8081
+//!
+//! This final terminal will connect to our proxy, which will in turn connect to
+//! the echo server, and you'll be able to see data flowing between them.
+
+#![deny(warnings)]
+
+extern crate tokio;
+
+use std::env;
+use std::io::{self, Read, Write};
+use std::net::{Shutdown, SocketAddr};
+use std::sync::{Arc, Mutex};
+
+use tokio::io::{copy, shutdown};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
+ let listen_addr = listen_addr.parse::<SocketAddr>()?;
+
+ let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
+ let server_addr = server_addr.parse::<SocketAddr>()?;
+
+ // Create a TCP listener which will listen for incoming connections.
+ let socket = TcpListener::bind(&listen_addr)?;
+ println!("Listening on: {}", listen_addr);
+ println!("Proxying to: {}", server_addr);
+
+ let done = socket
+ .incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |client| {
+ let server = TcpStream::connect(&server_addr);
+ let amounts = server.and_then(move |server| {
+ // Create separate read/write handles for the TCP clients that we're
+ // proxying data between. Note that typically you'd use
+ // `AsyncRead::split` for this operation, but we want our writer
+ // handles to have a custom implementation of `shutdown` which
+ // actually calls `TcpStream::shutdown` to ensure that EOF is
+ // transmitted properly across the proxied connection.
+ //
+ // As a result, we wrap up our client/server manually in arcs and
+ // use the impls below on our custom `MyTcpStream` type.
+ let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
+ let client_writer = client_reader.clone();
+ let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
+ let server_writer = server_reader.clone();
+
+ // Copy the data (in parallel) between the client and the server.
+ // After the copy is done we indicate to the remote side that we've
+ // finished by shutting down the connection.
+ let client_to_server = copy(client_reader, server_writer)
+ .and_then(|(n, _, server_writer)| shutdown(server_writer).map(move |_| n));
+
+ let server_to_client = copy(server_reader, client_writer)
+ .and_then(|(n, _, client_writer)| shutdown(client_writer).map(move |_| n));
+
+ client_to_server.join(server_to_client)
+ });
+
+ let msg = amounts
+ .map(move |(from_client, from_server)| {
+ println!(
+ "client wrote {} bytes and received {} bytes",
+ from_client, from_server
+ );
+ })
+ .map_err(|e| {
+ // Don't panic. Maybe the client just disconnected too soon.
+ println!("error: {}", e);
+ });
+
+ tokio::spawn(msg);
+
+ Ok(())
+ });
+
+ tokio::run(done);
+ Ok(())
+}
+
+// This is a custom type used to have a custom implementation of the
+// `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to
+// notify the remote end that we're done writing.
+#[derive(Clone)]
+struct MyTcpStream(Arc<Mutex<TcpStream>>);
+
+impl Read for MyTcpStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.lock().unwrap().read(buf)
+ }
+}
+
+impl Write for MyTcpStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.lock().unwrap().write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl AsyncRead for MyTcpStream {}
+
+impl AsyncWrite for MyTcpStream {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ self.0.lock().unwrap().shutdown(Shutdown::Write)?;
+ Ok(().into())
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/tinydb.rs b/third_party/rust/tokio-0.1.22/examples/tinydb.rs
new file mode 100644
index 0000000000..11298ed133
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/tinydb.rs
@@ -0,0 +1,227 @@
+//! A "tiny database" and accompanying protocol
+//!
+//! This example shows the usage of shared state amongst all connected clients,
+//! namely a database of key/value pairs. Each connected client can send a
+//! series of GET/SET commands to query the current value of a key or set the
+//! value of a key.
+//!
+//! This example has a simple protocol you can use to interact with the server.
+//! To run, first run this in one terminal window:
+//!
+//! cargo run --example tinydb
+//!
+//! and next in another windows run:
+//!
+//! cargo run --example connect 127.0.0.1:8080
+//!
+//! In the `connect` window you can type in commands where when you hit enter
+//! you'll get a response from the server for that command. An example session
+//! is:
+//!
+//!
+//! $ cargo run --example connect 127.0.0.1:8080
+//! GET foo
+//! foo = bar
+//! GET FOOBAR
+//! error: no key FOOBAR
+//! SET FOOBAR my awesome string
+//! set FOOBAR = `my awesome string`, previous: None
+//! SET foo tokio
+//! set foo = `tokio`, previous: Some("bar")
+//! GET foo
+//! foo = tokio
+//!
+//! Namely you can issue two forms of commands:
+//!
+//! * `GET $key` - this will fetch the value of `$key` from the database and
+//! return it. The server's database is initially populated with the key `foo`
+//! set to the value `bar`
+//! * `SET $key $value` - this will set the value of `$key` to `$value`,
+//! returning the previous value, if any.
+
+#![deny(warnings)]
+
+extern crate tokio;
+
+use std::collections::HashMap;
+use std::env;
+use std::io::BufReader;
+use std::net::SocketAddr;
+use std::sync::{Arc, Mutex};
+
+use tokio::io::{lines, write_all};
+use tokio::net::TcpListener;
+use tokio::prelude::*;
+
+/// The in-memory database shared amongst all clients.
+///
+/// This database will be shared via `Arc`, so to mutate the internal map we're
+/// going to use a `Mutex` for interior mutability.
+struct Database {
+ map: Mutex<HashMap<String, String>>,
+}
+
+/// Possible requests our clients can send us
+enum Request {
+ Get { key: String },
+ Set { key: String, value: String },
+}
+
+/// Responses to the `Request` commands above
+enum Response {
+ Value {
+ key: String,
+ value: String,
+ },
+ Set {
+ key: String,
+ value: String,
+ previous: Option<String>,
+ },
+ Error {
+ msg: String,
+ },
+}
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ // Parse the address we're going to run this server on
+ // and set up our TCP listener to accept connections.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>()?;
+ let listener = TcpListener::bind(&addr).map_err(|_| "failed to bind")?;
+ println!("Listening on: {}", addr);
+
+ // Create the shared state of this server that will be shared amongst all
+ // clients. We populate the initial database and then create the `Database`
+ // structure. Note the usage of `Arc` here which will be used to ensure that
+ // each independently spawned client will have a reference to the in-memory
+ // database.
+ let mut initial_db = HashMap::new();
+ initial_db.insert("foo".to_string(), "bar".to_string());
+ let db = Arc::new(Database {
+ map: Mutex::new(initial_db),
+ });
+
+ let done = listener
+ .incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |socket| {
+ // As with many other small examples, the first thing we'll do is
+ // *split* this TCP stream into two separately owned halves. This'll
+ // allow us to work with the read and write halves independently.
+ let (reader, writer) = socket.split();
+
+ // Since our protocol is line-based we use `tokio_io`'s `lines` utility
+ // to convert our stream of bytes, `reader`, into a `Stream` of lines.
+ let lines = lines(BufReader::new(reader));
+
+ // Here's where the meat of the processing in this server happens. First
+ // we see a clone of the database being created, which is creating a
+ // new reference for this connected client to use. Also note the `move`
+ // keyword on the closure here which moves ownership of the reference
+ // into the closure, which we'll need for spawning the client below.
+ //
+ // The `map` function here means that we'll run some code for all
+ // requests (lines) we receive from the client. The actual handling here
+ // is pretty simple, first we parse the request and if it's valid we
+ // generate a response based on the values in the database.
+ let db = db.clone();
+ let responses = lines.map(move |line| {
+ let request = match Request::parse(&line) {
+ Ok(req) => req,
+ Err(e) => return Response::Error { msg: e },
+ };
+
+ let mut db = db.map.lock().unwrap();
+ match request {
+ Request::Get { key } => match db.get(&key) {
+ Some(value) => Response::Value {
+ key,
+ value: value.clone(),
+ },
+ None => Response::Error {
+ msg: format!("no key {}", key),
+ },
+ },
+ Request::Set { key, value } => {
+ let previous = db.insert(key.clone(), value.clone());
+ Response::Set {
+ key,
+ value,
+ previous,
+ }
+ }
+ }
+ });
+
+ // At this point `responses` is a stream of `Response` types which we
+ // now want to write back out to the client. To do that we use
+ // `Stream::fold` to perform a loop here, serializing each response and
+ // then writing it out to the client.
+ let writes = responses.fold(writer, |writer, response| {
+ let mut response = response.serialize();
+ response.push('\n');
+ write_all(writer, response.into_bytes()).map(|(w, _)| w)
+ });
+
+ // Like with other small servers, we'll `spawn` this client to ensure it
+ // runs concurrently with all other clients, for now ignoring any errors
+ // that we see.
+ let msg = writes.then(move |_| Ok(()));
+
+ tokio::spawn(msg)
+ });
+
+ tokio::run(done);
+ Ok(())
+}
+
+impl Request {
+ fn parse(input: &str) -> Result<Request, String> {
+ let mut parts = input.splitn(3, " ");
+ match parts.next() {
+ Some("GET") => {
+ let key = match parts.next() {
+ Some(key) => key,
+ None => return Err(format!("GET must be followed by a key")),
+ };
+ if parts.next().is_some() {
+ return Err(format!("GET's key must not be followed by anything"));
+ }
+ Ok(Request::Get {
+ key: key.to_string(),
+ })
+ }
+ Some("SET") => {
+ let key = match parts.next() {
+ Some(key) => key,
+ None => return Err(format!("SET must be followed by a key")),
+ };
+ let value = match parts.next() {
+ Some(value) => value,
+ None => return Err(format!("SET needs a value")),
+ };
+ Ok(Request::Set {
+ key: key.to_string(),
+ value: value.to_string(),
+ })
+ }
+ Some(cmd) => Err(format!("unknown command: {}", cmd)),
+ None => Err(format!("empty input")),
+ }
+ }
+}
+
+impl Response {
+ fn serialize(&self) -> String {
+ match *self {
+ Response::Value { ref key, ref value } => format!("{} = {}", key, value),
+ Response::Set {
+ ref key,
+ ref value,
+ ref previous,
+ } => format!("set {} = `{}`, previous: {:?}", key, value, previous),
+ Response::Error { ref msg } => format!("error: {}", msg),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs b/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs
new file mode 100644
index 0000000000..cde1b79afb
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs
@@ -0,0 +1,325 @@
+//! A "tiny" example of HTTP request/response handling using transports.
+//!
+//! This example is intended for *learning purposes* to see how various pieces
+//! hook up together and how HTTP can get up and running. Note that this example
+//! is written with the restriction that it *can't* use any "big" library other
+//! than Tokio, if you'd like a "real world" HTTP library you likely want a
+//! crate like Hyper.
+//!
+//! Code here is based on the `echo-threads` example and implements two paths,
+//! the `/plaintext` and `/json` routes to respond with some text and json,
+//! respectively. By default this will run I/O on all the cores your system has
+//! available, and it doesn't support HTTP request bodies.
+
+#![deny(warnings)]
+
+extern crate bytes;
+extern crate http;
+extern crate httparse;
+#[macro_use]
+extern crate serde_derive;
+extern crate serde_json;
+extern crate time;
+extern crate tokio;
+extern crate tokio_io;
+
+use std::net::SocketAddr;
+use std::{env, fmt, io};
+
+use tokio::codec::{Decoder, Encoder};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+
+use bytes::BytesMut;
+use http::header::HeaderValue;
+use http::{Request, Response, StatusCode};
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ // Parse the arguments, bind the TCP socket we'll be listening to, spin up
+ // our worker threads, and start shipping sockets to those worker threads.
+ let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = addr.parse::<SocketAddr>()?;
+
+ let listener = TcpListener::bind(&addr)?;
+ println!("Listening on: {}", addr);
+
+ tokio::run({
+ listener
+ .incoming()
+ .map_err(|e| println!("failed to accept socket; error = {:?}", e))
+ .for_each(|socket| {
+ process(socket);
+ Ok(())
+ })
+ });
+ Ok(())
+}
+
+fn process(socket: TcpStream) {
+ let (tx, rx) =
+ // Frame the socket using the `Http` protocol. This maps the TCP socket
+ // to a Stream + Sink of HTTP frames.
+ Http.framed(socket)
+ // This splits a single `Stream + Sink` value into two separate handles
+ // that can be used independently (even on different tasks or threads).
+ .split();
+
+ // Map all requests into responses and send them back to the client.
+ let task = tx.send_all(rx.and_then(respond)).then(|res| {
+ if let Err(e) = res {
+ println!("failed to process connection; error = {:?}", e);
+ }
+
+ Ok(())
+ });
+
+ // Spawn the task that handles the connection.
+ tokio::spawn(task);
+}
+
+/// "Server logic" is implemented in this function.
+///
+/// This function is a map from and HTTP request to a future of a response and
+/// represents the various handling a server might do. Currently the contents
+/// here are pretty uninteresting.
+fn respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send> {
+ let f = future::lazy(move || {
+ let mut response = Response::builder();
+ let body = match req.uri().path() {
+ "/plaintext" => {
+ response.header("Content-Type", "text/plain");
+ "Hello, World!".to_string()
+ }
+ "/json" => {
+ response.header("Content-Type", "application/json");
+
+ #[derive(Serialize)]
+ struct Message {
+ message: &'static str,
+ }
+ serde_json::to_string(&Message {
+ message: "Hello, World!",
+ })?
+ }
+ _ => {
+ response.status(StatusCode::NOT_FOUND);
+ String::new()
+ }
+ };
+ let response = response
+ .body(body)
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+ Ok(response)
+ });
+
+ Box::new(f)
+}
+
+struct Http;
+
+/// Implementation of encoding an HTTP response into a `BytesMut`, basically
+/// just writing out an HTTP/1.1 response.
+impl Encoder for Http {
+ type Item = Response<String>;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> {
+ use std::fmt::Write;
+
+ write!(
+ BytesWrite(dst),
+ "\
+ HTTP/1.1 {}\r\n\
+ Server: Example\r\n\
+ Content-Length: {}\r\n\
+ Date: {}\r\n\
+ ",
+ item.status(),
+ item.body().len(),
+ date::now()
+ )
+ .unwrap();
+
+ for (k, v) in item.headers() {
+ dst.extend_from_slice(k.as_str().as_bytes());
+ dst.extend_from_slice(b": ");
+ dst.extend_from_slice(v.as_bytes());
+ dst.extend_from_slice(b"\r\n");
+ }
+
+ dst.extend_from_slice(b"\r\n");
+ dst.extend_from_slice(item.body().as_bytes());
+
+ return Ok(());
+
+ // Right now `write!` on `Vec<u8>` goes through io::Write and is not
+ // super speedy, so inline a less-crufty implementation here which
+ // doesn't go through io::Error.
+ struct BytesWrite<'a>(&'a mut BytesMut);
+
+ impl<'a> fmt::Write for BytesWrite<'a> {
+ fn write_str(&mut self, s: &str) -> fmt::Result {
+ self.0.extend_from_slice(s.as_bytes());
+ Ok(())
+ }
+
+ fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result {
+ fmt::write(self, args)
+ }
+ }
+ }
+}
+
+/// Implementation of decoding an HTTP request from the bytes we've read so far.
+/// This leverages the `httparse` crate to do the actual parsing and then we use
+/// that information to construct an instance of a `http::Request` object,
+/// trying to avoid allocations where possible.
+impl Decoder for Http {
+ type Item = Request<()>;
+ type Error = io::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> {
+ // TODO: we should grow this headers array if parsing fails and asks
+ // for more headers
+ let mut headers = [None; 16];
+ let (method, path, version, amt) = {
+ let mut parsed_headers = [httparse::EMPTY_HEADER; 16];
+ let mut r = httparse::Request::new(&mut parsed_headers);
+ let status = r.parse(src).map_err(|e| {
+ let msg = format!("failed to parse http request: {:?}", e);
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ let amt = match status {
+ httparse::Status::Complete(amt) => amt,
+ httparse::Status::Partial => return Ok(None),
+ };
+
+ let toslice = |a: &[u8]| {
+ let start = a.as_ptr() as usize - src.as_ptr() as usize;
+ assert!(start < src.len());
+ (start, start + a.len())
+ };
+
+ for (i, header) in r.headers.iter().enumerate() {
+ let k = toslice(header.name.as_bytes());
+ let v = toslice(header.value);
+ headers[i] = Some((k, v));
+ }
+
+ (
+ toslice(r.method.unwrap().as_bytes()),
+ toslice(r.path.unwrap().as_bytes()),
+ r.version.unwrap(),
+ amt,
+ )
+ };
+ if version != 1 {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "only HTTP/1.1 accepted",
+ ));
+ }
+ let data = src.split_to(amt).freeze();
+ let mut ret = Request::builder();
+ ret.method(&data[method.0..method.1]);
+ ret.uri(data.slice(path.0, path.1));
+ ret.version(http::Version::HTTP_11);
+ for header in headers.iter() {
+ let (k, v) = match *header {
+ Some((ref k, ref v)) => (k, v),
+ None => break,
+ };
+ let value = unsafe { HeaderValue::from_shared_unchecked(data.slice(v.0, v.1)) };
+ ret.header(&data[k.0..k.1], value);
+ }
+
+ let req = ret
+ .body(())
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+ Ok(Some(req))
+ }
+}
+
+mod date {
+ use std::cell::RefCell;
+ use std::fmt::{self, Write};
+ use std::str;
+
+ use time::{self, Duration};
+
+ pub struct Now(());
+
+ /// Returns a struct, which when formatted, renders an appropriate `Date`
+ /// header value.
+ pub fn now() -> Now {
+ Now(())
+ }
+
+ // Gee Alex, doesn't this seem like premature optimization. Well you see
+ // there Billy, you're absolutely correct! If your server is *bottlenecked*
+ // on rendering the `Date` header, well then boy do I have news for you, you
+ // don't need this optimization.
+ //
+ // In all seriousness, though, a simple "hello world" benchmark which just
+ // sends back literally "hello world" with standard headers actually is
+ // bottlenecked on rendering a date into a byte buffer. Since it was at the
+ // top of a profile, and this was done for some competitive benchmarks, this
+ // module was written.
+ //
+ // Just to be clear, though, I was not intending on doing this because it
+ // really does seem kinda absurd, but it was done by someone else [1], so I
+ // blame them! :)
+ //
+ // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66
+
+ struct LastRenderedNow {
+ bytes: [u8; 128],
+ amt: usize,
+ next_update: time::Timespec,
+ }
+
+ thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow {
+ bytes: [0; 128],
+ amt: 0,
+ next_update: time::Timespec::new(0, 0),
+ }));
+
+ impl fmt::Display for Now {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ LAST.with(|cache| {
+ let mut cache = cache.borrow_mut();
+ let now = time::get_time();
+ if now >= cache.next_update {
+ cache.update(now);
+ }
+ f.write_str(cache.buffer())
+ })
+ }
+ }
+
+ impl LastRenderedNow {
+ fn buffer(&self) -> &str {
+ str::from_utf8(&self.bytes[..self.amt]).unwrap()
+ }
+
+ fn update(&mut self, now: time::Timespec) {
+ self.amt = 0;
+ write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap();
+ self.next_update = now + Duration::seconds(1);
+ self.next_update.nsec = 0;
+ }
+ }
+
+ struct LocalBuffer<'a>(&'a mut LastRenderedNow);
+
+ impl<'a> fmt::Write for LocalBuffer<'a> {
+ fn write_str(&mut self, s: &str) -> fmt::Result {
+ let start = self.0.amt;
+ let end = start + s.len();
+ self.0.bytes[start..end].copy_from_slice(s.as_bytes());
+ self.0.amt += s.len();
+ Ok(())
+ }
+ }
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/udp-client.rs b/third_party/rust/tokio-0.1.22/examples/udp-client.rs
new file mode 100644
index 0000000000..900d3616df
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/udp-client.rs
@@ -0,0 +1,70 @@
+//! A UDP client that just sends everything it gets via `stdio` in a single datagram, and then
+//! waits for a reply.
+//!
+//! For the reasons of simplicity data from `stdio` is read until `EOF` in a blocking manner.
+//!
+//! You can test this out by running an echo server:
+//!
+//! ```
+//! $ cargo run --example echo-udp -- 127.0.0.1:8080
+//! ```
+//!
+//! and running the client in another terminal:
+//!
+//! ```
+//! $ cargo run --example udp-client
+//! ```
+//!
+//! You can optionally provide any custom endpoint address for the client:
+//!
+//! ```
+//! $ cargo run --example udp-client -- 127.0.0.1:8080
+//! ```
+//!
+//! Don't forget to pass `EOF` to the standard input of the client!
+//!
+//! Please mind that since the UDP protocol doesn't have any capabilities to detect a broken
+//! connection the server needs to be run first, otherwise the client will block forever.
+
+extern crate futures;
+extern crate tokio;
+
+use std::env;
+use std::io::stdin;
+use std::net::SocketAddr;
+use tokio::net::UdpSocket;
+use tokio::prelude::*;
+
+fn get_stdin_data() -> Result<Vec<u8>, Box<std::error::Error>> {
+ let mut buf = Vec::new();
+ stdin().read_to_end(&mut buf)?;
+ Ok(buf)
+}
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ let remote_addr: SocketAddr = env::args()
+ .nth(1)
+ .unwrap_or("127.0.0.1:8080".into())
+ .parse()?;
+ // We use port 0 to let the operating system allocate an available port for us.
+ let local_addr: SocketAddr = if remote_addr.is_ipv4() {
+ "0.0.0.0:0"
+ } else {
+ "[::]:0"
+ }
+ .parse()?;
+ let socket = UdpSocket::bind(&local_addr)?;
+ const MAX_DATAGRAM_SIZE: usize = 65_507;
+ socket
+ .send_dgram(get_stdin_data()?, &remote_addr)
+ .and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE]))
+ .map(|(_, data, len, _)| {
+ println!(
+ "Received {} bytes:\n{}",
+ len,
+ String::from_utf8_lossy(&data[..len])
+ )
+ })
+ .wait()?;
+ Ok(())
+}
diff --git a/third_party/rust/tokio-0.1.22/examples/udp-codec.rs b/third_party/rust/tokio-0.1.22/examples/udp-codec.rs
new file mode 100644
index 0000000000..3657d8cc17
--- /dev/null
+++ b/third_party/rust/tokio-0.1.22/examples/udp-codec.rs
@@ -0,0 +1,65 @@
+//! This example leverages `BytesCodec` to create a UDP client and server which
+//! speak a custom protocol.
+//!
+//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of
+//! client messages. These messages are then processed and returned back as a
+//! new message with a new destination. Overall, we then use this to construct a
+//! "ping pong" pair where two sockets are sending messages back and forth.
+
+#![deny(warnings)]
+
+extern crate env_logger;
+extern crate tokio;
+extern crate tokio_codec;
+extern crate tokio_io;
+
+use std::net::SocketAddr;
+
+use tokio::net::{UdpFramed, UdpSocket};
+use tokio::prelude::*;
+use tokio_codec::BytesCodec;
+
+fn main() -> Result<(), Box<std::error::Error>> {
+ let _ = env_logger::init();
+
+ let addr: SocketAddr = "127.0.0.1:0".parse()?;
+
+ // Bind both our sockets and then figure out what ports we got.
+ let a = UdpSocket::bind(&addr)?;
+ let b = UdpSocket::bind(&addr)?;
+ let b_addr = b.local_addr()?;
+
+ // We're parsing each socket with the `BytesCodec` included in `tokio_io`, and then we
+ // `split` each codec into the sink/stream halves.
+ let (a_sink, a_stream) = UdpFramed::new(a, BytesCodec::new()).split();
+ let (b_sink, b_stream) = UdpFramed::new(b, BytesCodec::new()).split();
+
+ // Start off by sending a ping from a to b, afterwards we just print out
+ // what they send us and continually send pings
+ // let pings = stream::iter((0..5).map(Ok));
+ let a = a_sink.send(("PING".into(), b_addr)).and_then(|a_sink| {
+ let mut i = 0;
+ let a_stream = a_stream.take(4).map(move |(msg, addr)| {
+ i += 1;
+ println!("[a] recv: {}", String::from_utf8_lossy(&msg));
+ (format!("PING {}", i).into(), addr)
+ });
+ a_sink.send_all(a_stream)
+ });
+
+ // The second client we have will receive the pings from `a` and then send
+ // back pongs.
+ let b_stream = b_stream.map(|(msg, addr)| {
+ println!("[b] recv: {}", String::from_utf8_lossy(&msg));
+ ("PONG".into(), addr)
+ });
+ let b = b_sink.send_all(b_stream);
+
+ // Spawn the sender of pongs and then wait for our pinger to finish.
+ tokio::run({
+ b.join(a)
+ .map(|_| ())
+ .map_err(|e| println!("error = {:?}", e))
+ });
+ Ok(())
+}