#![allow(dead_code)] use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio_stream::Stream; struct UnboundedStream { recv: UnboundedReceiver, } impl Stream for UnboundedStream { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::into_inner(self).recv.poll_recv(cx) } } pub fn unbounded_channel_stream() -> (UnboundedSender, impl Stream) { let (tx, rx) = mpsc::unbounded_channel(); let stream = UnboundedStream { recv: rx }; (tx, stream) } struct BoundedStream { recv: Receiver, } impl Stream for BoundedStream { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::into_inner(self).recv.poll_recv(cx) } } pub fn channel_stream(size: usize) -> (Sender, impl Stream) { let (tx, rx) = mpsc::channel(size); let stream = BoundedStream { recv: rx }; (tx, stream) }