use crate::stream::{FuturesUnordered, StreamExt}; use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; use core::cmp::Ordering; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; use futures_core::stream::Stream; use futures_core::{ task::{Context, Poll}, FusedStream, }; use pin_project_lite::pin_project; pin_project! { #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] struct OrderWrapper { #[pin] data: T, // A future or a future's output index: isize, } } impl PartialEq for OrderWrapper { fn eq(&self, other: &Self) -> bool { self.index == other.index } } impl Eq for OrderWrapper {} impl PartialOrd for OrderWrapper { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for OrderWrapper { fn cmp(&self, other: &Self) -> Ordering { // BinaryHeap is a max heap, so compare backwards here. other.index.cmp(&self.index) } } impl Future for OrderWrapper where T: Future, { type Output = OrderWrapper; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let index = self.index; self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index }) } } /// An unbounded queue of futures. /// /// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order /// on top of the set of futures. While futures in the set will race to /// completion in parallel, results will only be returned in the order their /// originating futures were added to the queue. /// /// Futures are pushed into this queue and their realized values are yielded in /// order. This structure is optimized to manage a large number of futures. /// Futures managed by `FuturesOrdered` will only be polled when they generate /// notifications. This reduces the required amount of work needed to coordinate /// large numbers of futures. /// /// When a `FuturesOrdered` is first created, it does not contain any futures. /// Calling `poll` in this state will result in `Poll::Ready(None))` to be /// returned. Futures are submitted to the queue using `push`; however, the /// future will **not** be polled at this point. `FuturesOrdered` will only /// poll managed futures when `FuturesOrdered::poll` is called. As such, it /// is important to call `poll` after pushing new futures. /// /// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that /// the queue is currently not managing any futures. A future may be submitted /// to the queue at a later time. At that point, a call to /// `FuturesOrdered::poll` will either return the future's resolved value /// **or** `Poll::Pending` if the future has not yet completed. When /// multiple futures are submitted to the queue, `FuturesOrdered::poll` will /// return `Poll::Pending` until the first future completes, even if /// some of the later futures have already completed. /// /// Note that you can create a ready-made `FuturesOrdered` via the /// [`collect`](Iterator::collect) method, or you can start with an empty queue /// with the `FuturesOrdered::new` constructor. /// /// This type is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. #[must_use = "streams do nothing unless polled"] pub struct FuturesOrdered { in_progress_queue: FuturesUnordered>, queued_outputs: BinaryHeap>, next_incoming_index: isize, next_outgoing_index: isize, } impl Unpin for FuturesOrdered {} impl FuturesOrdered { /// Constructs a new, empty `FuturesOrdered` /// /// The returned `FuturesOrdered` does not contain any futures and, in this /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. pub fn new() -> Self { Self { in_progress_queue: FuturesUnordered::new(), queued_outputs: BinaryHeap::new(), next_incoming_index: 0, next_outgoing_index: 0, } } /// Returns the number of futures contained in the queue. /// /// This represents the total number of in-flight futures, both /// those currently processing and those that have completed but /// which are waiting for earlier futures to complete. pub fn len(&self) -> usize { self.in_progress_queue.len() + self.queued_outputs.len() } /// Returns `true` if the queue contains no futures pub fn is_empty(&self) -> bool { self.in_progress_queue.is_empty() && self.queued_outputs.is_empty() } /// Push a future into the queue. /// /// This function submits the given future to the internal set for managing. /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { self.push_back(future); } /// Pushes a future to the back of the queue. /// /// This function submits the given future to the internal set for managing. /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } /// Pushes a future to the front of the queue. /// /// This function submits the given future to the internal set for managing. /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. This future will be the next future to be returned /// complete. pub fn push_front(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; self.next_outgoing_index -= 1; self.in_progress_queue.push(wrapped); } } impl Default for FuturesOrdered { fn default() -> Self { Self::new() } } impl Stream for FuturesOrdered { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; // Check to see if we've already received the next value if let Some(next_output) = this.queued_outputs.peek_mut() { if next_output.index == this.next_outgoing_index { this.next_outgoing_index += 1; return Poll::Ready(Some(PeekMut::pop(next_output).data)); } } loop { match ready!(this.in_progress_queue.poll_next_unpin(cx)) { Some(output) => { if output.index == this.next_outgoing_index { this.next_outgoing_index += 1; return Poll::Ready(Some(output.data)); } else { this.queued_outputs.push(output) } } None => return Poll::Ready(None), } } } fn size_hint(&self) -> (usize, Option) { let len = self.len(); (len, Some(len)) } } impl Debug for FuturesOrdered { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "FuturesOrdered {{ ... }}") } } impl FromIterator for FuturesOrdered { fn from_iter(iter: T) -> Self where T: IntoIterator, { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { acc.push_back(item); acc }) } } impl FusedStream for FuturesOrdered { fn is_terminated(&self) -> bool { self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty() } } impl Extend for FuturesOrdered { fn extend(&mut self, iter: I) where I: IntoIterator, { for item in iter { self.push_back(item); } } }