use crate::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::Receiver; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. /// /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] pub struct ReceiverStream { inner: Receiver, } impl ReceiverStream { /// Create a new `ReceiverStream`. pub fn new(recv: Receiver) -> Self { Self { inner: recv } } /// Get back the inner `Receiver`. pub fn into_inner(self) -> Receiver { self.inner } /// Closes the receiving half of a channel without dropping it. /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. Any /// outstanding [`Permit`] values will still be able to send messages. /// /// To guarantee no messages are dropped, after calling `close()`, you must /// receive all items from the stream until `None` is returned. /// /// [`Permit`]: struct@tokio::sync::mpsc::Permit pub fn close(&mut self) { self.inner.close() } } impl Stream for ReceiverStream { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.poll_recv(cx) } } impl AsRef> for ReceiverStream { fn as_ref(&self) -> &Receiver { &self.inner } } impl AsMut> for ReceiverStream { fn as_mut(&mut self) -> &mut Receiver { &mut self.inner } } impl From> for ReceiverStream { fn from(recv: Receiver) -> Self { Self::new(recv) } }