summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.11/examples/proxy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.11/examples/proxy.rs')
-rw-r--r--third_party/rust/tokio-0.1.11/examples/proxy.rs128
1 files changed, 128 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/examples/proxy.rs b/third_party/rust/tokio-0.1.11/examples/proxy.rs
new file mode 100644
index 0000000000..bed8314a31
--- /dev/null
+++ b/third_party/rust/tokio-0.1.11/examples/proxy.rs
@@ -0,0 +1,128 @@
+//! A proxy that forwards data to another server and forwards that server's
+//! responses back to clients.
+//!
+//! Because the Tokio runtime uses a thread pool, each TCP connection is
+//! processed concurrently with all other TCP connections across multiple
+//! threads.
+//!
+//! You can showcase this by running this in one terminal:
+//!
+//! cargo run --example proxy
+//!
+//! This in another terminal
+//!
+//! cargo run --example echo
+//!
+//! And finally this in another terminal
+//!
+//! cargo run --example connect 127.0.0.1:8081
+//!
+//! This final terminal will connect to our proxy, which will in turn connect to
+//! the echo server, and you'll be able to see data flowing between them.
+
+#![deny(warnings)]
+
+extern crate tokio;
+
+use std::sync::{Arc, Mutex};
+use std::env;
+use std::net::{Shutdown, SocketAddr};
+use std::io::{self, Read, Write};
+
+use tokio::io::{copy, shutdown};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::prelude::*;
+
+fn main() {
+ let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
+ let listen_addr = listen_addr.parse::<SocketAddr>().unwrap();
+
+ let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
+ let server_addr = server_addr.parse::<SocketAddr>().unwrap();
+
+ // Create a TCP listener which will listen for incoming connections.
+ let socket = TcpListener::bind(&listen_addr).unwrap();
+ println!("Listening on: {}", listen_addr);
+ println!("Proxying to: {}", server_addr);
+
+ let done = socket.incoming()
+ .map_err(|e| println!("error accepting socket; error = {:?}", e))
+ .for_each(move |client| {
+ let server = TcpStream::connect(&server_addr);
+ let amounts = server.and_then(move |server| {
+ // Create separate read/write handles for the TCP clients that we're
+ // proxying data between. Note that typically you'd use
+ // `AsyncRead::split` for this operation, but we want our writer
+ // handles to have a custom implementation of `shutdown` which
+ // actually calls `TcpStream::shutdown` to ensure that EOF is
+ // transmitted properly across the proxied connection.
+ //
+ // As a result, we wrap up our client/server manually in arcs and
+ // use the impls below on our custom `MyTcpStream` type.
+ let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
+ let client_writer = client_reader.clone();
+ let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
+ let server_writer = server_reader.clone();
+
+ // Copy the data (in parallel) between the client and the server.
+ // After the copy is done we indicate to the remote side that we've
+ // finished by shutting down the connection.
+ let client_to_server = copy(client_reader, server_writer)
+ .and_then(|(n, _, server_writer)| {
+ shutdown(server_writer).map(move |_| n)
+ });
+
+ let server_to_client = copy(server_reader, client_writer)
+ .and_then(|(n, _, client_writer)| {
+ shutdown(client_writer).map(move |_| n)
+ });
+
+ client_to_server.join(server_to_client)
+ });
+
+ let msg = amounts.map(move |(from_client, from_server)| {
+ println!("client wrote {} bytes and received {} bytes",
+ from_client, from_server);
+ }).map_err(|e| {
+ // Don't panic. Maybe the client just disconnected too soon.
+ println!("error: {}", e);
+ });
+
+ tokio::spawn(msg);
+
+ Ok(())
+ });
+
+ tokio::run(done);
+}
+
+// This is a custom type used to have a custom implementation of the
+// `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to
+// notify the remote end that we're done writing.
+#[derive(Clone)]
+struct MyTcpStream(Arc<Mutex<TcpStream>>);
+
+impl Read for MyTcpStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.lock().unwrap().read(buf)
+ }
+}
+
+impl Write for MyTcpStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.lock().unwrap().write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl AsyncRead for MyTcpStream {}
+
+impl AsyncWrite for MyTcpStream {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ try!(self.0.lock().unwrap().shutdown(Shutdown::Write));
+ Ok(().into())
+ }
+}