// FIXME: re-implement tests with `async/await` /* #[cfg(feature = "runtime")] use std::collections::HashMap; use std::cmp; use std::io::{self, Read, Write}; #[cfg(feature = "runtime")] use std::sync::{Arc, Mutex}; use bytes::Buf; use futures::{Async, Poll}; #[cfg(feature = "runtime")] use futures::Future; use futures::task::{self, Task}; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] use crate::client::connect::{Connect, Connected, Destination}; #[cfg(feature = "runtime")] pub struct Duplex { inner: Arc>, } #[cfg(feature = "runtime")] struct DuplexInner { handle_read_task: Option, read: AsyncIo, write: AsyncIo, } #[cfg(feature = "runtime")] impl Duplex { pub(crate) fn channel() -> (Duplex, DuplexHandle) { let mut inner = DuplexInner { handle_read_task: None, read: AsyncIo::new_buf(Vec::new(), 0), write: AsyncIo::new_buf(Vec::new(), std::usize::MAX), }; inner.read.park_tasks(true); inner.write.park_tasks(true); let inner = Arc::new(Mutex::new(inner)); let duplex = Duplex { inner: inner.clone(), }; let handle = DuplexHandle { inner: inner, }; (duplex, handle) } } #[cfg(feature = "runtime")] impl Read for Duplex { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.lock().unwrap().read.read(buf) } } #[cfg(feature = "runtime")] impl Write for Duplex { fn write(&mut self, buf: &[u8]) -> io::Result { let mut inner = self.inner.lock().unwrap(); let ret = inner.write.write(buf); if let Some(task) = inner.handle_read_task.take() { trace!("waking DuplexHandle read"); task.notify(); } ret } fn flush(&mut self) -> io::Result<()> { self.inner.lock().unwrap().write.flush() } } #[cfg(feature = "runtime")] impl AsyncRead for Duplex { } #[cfg(feature = "runtime")] impl AsyncWrite for Duplex { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(().into()) } fn write_buf(&mut self, buf: &mut B) -> Poll { let mut inner = self.inner.lock().unwrap(); if let Some(task) = inner.handle_read_task.take() { task.notify(); } inner.write.write_buf(buf) } } #[cfg(feature = "runtime")] pub struct DuplexHandle { inner: Arc>, } #[cfg(feature = "runtime")] impl DuplexHandle { pub fn read(&self, buf: &mut [u8]) -> Poll { let mut inner = self.inner.lock().unwrap(); assert!(buf.len() >= inner.write.inner.len()); if inner.write.inner.is_empty() { trace!("DuplexHandle read parking"); inner.handle_read_task = Some(task::current()); return Ok(Async::NotReady); } inner.write.read(buf).map(Async::Ready) } pub fn write(&self, bytes: &[u8]) -> Poll { let mut inner = self.inner.lock().unwrap(); assert_eq!(inner.read.inner.pos, 0); assert_eq!(inner.read.inner.vec.len(), 0, "write but read isn't empty"); inner .read .inner .vec .extend(bytes); inner.read.block_in(bytes.len()); Ok(Async::Ready(bytes.len())) } } #[cfg(feature = "runtime")] impl Drop for DuplexHandle { fn drop(&mut self) { trace!("mock duplex handle drop"); if !::std::thread::panicking() { let mut inner = self.inner.lock().unwrap(); inner.read.close(); inner.write.close(); } } } #[cfg(feature = "runtime")] type BoxedConnectFut = Box + Send>; #[cfg(feature = "runtime")] #[derive(Clone)] pub struct MockConnector { mocks: Arc>, } #[cfg(feature = "runtime")] struct MockedConnections(HashMap>); #[cfg(feature = "runtime")] impl MockConnector { pub fn new() -> MockConnector { MockConnector { mocks: Arc::new(Mutex::new(MockedConnections(HashMap::new()))), } } pub fn mock(&mut self, key: &str) -> DuplexHandle { use futures::future; self.mock_fut(key, future::ok::<_, ()>(())) } pub fn mock_fut(&mut self, key: &str, fut: F) -> DuplexHandle where F: Future + Send + 'static, { self.mock_opts(key, Connected::new(), fut) } pub fn mock_opts(&mut self, key: &str, connected: Connected, fut: F) -> DuplexHandle where F: Future + Send + 'static, { let key = key.to_owned(); let (duplex, handle) = Duplex::channel(); let fut = Box::new(fut.then(move |_| { trace!("MockConnector mocked fut ready"); Ok((duplex, connected)) })); self.mocks.lock().unwrap().0.entry(key) .or_insert(Vec::new()) .push(fut); handle } } #[cfg(feature = "runtime")] impl Connect for MockConnector { type Transport = Duplex; type Error = io::Error; type Future = BoxedConnectFut; fn connect(&self, dst: Destination) -> Self::Future { trace!("mock connect: {:?}", dst); let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() { format!(":{}", port) } else { "".to_owned() }); let mut mocks = self.mocks.lock().unwrap(); let mocks = mocks.0.get_mut(&key) .expect(&format!("unknown mocks uri: {}", key)); assert!(!mocks.is_empty(), "no additional mocks for {}", key); mocks.remove(0) } } #[cfg(feature = "runtime")] impl Drop for MockedConnections { fn drop(&mut self) { if !::std::thread::panicking() { for (key, mocks) in self.0.iter() { assert_eq!( mocks.len(), 0, "not all mocked connects for {:?} were used", key, ); } } } } */