#![cfg(feature = "bilock")] use futures::executor::block_on; use futures::future; use futures::stream; use futures::task::{Context, Poll}; use futures::Future; use futures::StreamExt; use futures_test::task::noop_context; use futures_util::lock::BiLock; use std::pin::Pin; use std::thread; #[test] fn smoke() { let future = future::lazy(|cx| { let (a, b) = BiLock::new(1); { let mut lock = match a.poll_lock(cx) { Poll::Ready(l) => l, Poll::Pending => panic!("poll not ready"), }; assert_eq!(*lock, 1); *lock = 2; assert!(b.poll_lock(cx).is_pending()); assert!(a.poll_lock(cx).is_pending()); } assert!(b.poll_lock(cx).is_ready()); assert!(a.poll_lock(cx).is_ready()); { let lock = match b.poll_lock(cx) { Poll::Ready(l) => l, Poll::Pending => panic!("poll not ready"), }; assert_eq!(*lock, 2); } assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); Ok::<(), ()>(()) }); assert_eq!(block_on(future), Ok(())); } #[test] fn concurrent() { const N: usize = 10000; let mut cx = noop_context(); let (a, b) = BiLock::new(0); let a = Increment { a: Some(a), remaining: N }; let b = stream::iter(0..N).fold(b, |b, _n| async { let mut g = b.lock().await; *g += 1; drop(g); b }); let t1 = thread::spawn(move || block_on(a)); let b = block_on(b); let a = t1.join().unwrap(); match a.poll_lock(&mut cx) { Poll::Ready(l) => assert_eq!(*l, 2 * N), Poll::Pending => panic!("poll not ready"), } match b.poll_lock(&mut cx) { Poll::Ready(l) => assert_eq!(*l, 2 * N), Poll::Pending => panic!("poll not ready"), } assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); struct Increment { remaining: usize, a: Option>, } impl Future for Increment { type Output = BiLock; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if self.remaining == 0 { return self.a.take().unwrap().into(); } let a = self.a.as_mut().unwrap(); let mut a = match a.poll_lock(cx) { Poll::Ready(l) => l, Poll::Pending => return Poll::Pending, }; *a += 1; drop(a); self.remaining -= 1; } } } }