use crate::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedReceiver; /// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. /// /// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] pub struct UnboundedReceiverStream { inner: UnboundedReceiver, } impl UnboundedReceiverStream { /// Create a new `UnboundedReceiverStream`. pub fn new(recv: UnboundedReceiver) -> Self { Self { inner: recv } } /// Get back the inner `UnboundedReceiver`. pub fn into_inner(self) -> UnboundedReceiver { self.inner } /// Closes the receiving half of a channel without dropping it. /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. pub fn close(&mut self) { self.inner.close() } } impl Stream for UnboundedReceiverStream { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.poll_recv(cx) } } impl AsRef> for UnboundedReceiverStream { fn as_ref(&self) -> &UnboundedReceiver { &self.inner } } impl AsMut> for UnboundedReceiverStream { fn as_mut(&mut self) -> &mut UnboundedReceiver { &mut self.inner } } impl From> for UnboundedReceiverStream { fn from(recv: UnboundedReceiver) -> Self { Self::new(recv) } }