summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/stream
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/stream')
-rw-r--r--vendor/futures-util/src/stream/futures_ordered.rs36
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/iter.rs4
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/mod.rs61
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs2
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/task.rs11
-rw-r--r--vendor/futures-util/src/stream/mod.rs15
-rw-r--r--vendor/futures-util/src/stream/select_all.rs35
-rw-r--r--vendor/futures-util/src/stream/select_with_strategy.rs143
-rw-r--r--vendor/futures-util/src/stream/stream/buffer_unordered.rs6
-rw-r--r--vendor/futures-util/src/stream/stream/buffered.rs14
-rw-r--r--vendor/futures-util/src/stream/stream/chain.rs3
-rw-r--r--vendor/futures-util/src/stream/stream/chunks.rs11
-rw-r--r--vendor/futures-util/src/stream/stream/collect.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/filter.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/filter_map.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/flatten_unordered.rs536
-rw-r--r--vendor/futures-util/src/stream/stream/mod.rs138
-rw-r--r--vendor/futures-util/src/stream/stream/peek.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/ready_chunks.rs53
-rw-r--r--vendor/futures-util/src/stream/stream/skip_while.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/split.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/take.rs6
-rw-r--r--vendor/futures-util/src/stream/stream/take_while.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/then.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/unzip.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/zip.rs4
-rw-r--r--vendor/futures-util/src/stream/try_stream/and_then.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/into_async_read.rs101
-rw-r--r--vendor/futures-util/src/stream/try_stream/mod.rs92
-rw-r--r--vendor/futures-util/src/stream/try_stream/or_else.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_buffered.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_chunks.rs11
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_collect.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_filter.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_filter_map.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs176
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_skip_while.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_take_while.rs2
38 files changed, 1238 insertions, 254 deletions
diff --git a/vendor/futures-util/src/stream/futures_ordered.rs b/vendor/futures-util/src/stream/futures_ordered.rs
index f596b3b0e..618bf1b7b 100644
--- a/vendor/futures-util/src/stream/futures_ordered.rs
+++ b/vendor/futures-util/src/stream/futures_ordered.rs
@@ -19,7 +19,7 @@ pin_project! {
struct OrderWrapper<T> {
#[pin]
data: T, // A future or a future's output
- index: usize,
+ index: isize,
}
}
@@ -58,7 +58,7 @@ where
/// An unbounded queue of futures.
///
-/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
+/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order
/// on top of the set of futures. While futures in the set will race to
/// completion in parallel, results will only be returned in the order their
/// originating futures were added to the queue.
@@ -95,8 +95,8 @@ where
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
- next_incoming_index: usize,
- next_outgoing_index: usize,
+ next_incoming_index: isize,
+ next_outgoing_index: isize,
}
impl<T: Future> Unpin for FuturesOrdered<T> {}
@@ -135,11 +135,35 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// This function will not call `poll` on the submitted future. The caller
/// must ensure that `FuturesOrdered::poll` is called in order to receive
/// task notifications.
+ #[deprecated(note = "use `push_back` instead")]
pub fn push(&mut self, future: Fut) {
+ self.push_back(future);
+ }
+
+ /// Pushes a future to the back of the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications.
+ pub fn push_back(&mut self, future: Fut) {
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
self.next_incoming_index += 1;
self.in_progress_queue.push(wrapped);
}
+
+ /// Pushes a future to the front of the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications. This future will be the next future to be returned
+ /// complete.
+ pub fn push_front(&mut self, future: Fut) {
+ let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
+ self.next_outgoing_index -= 1;
+ self.in_progress_queue.push(wrapped);
+ }
}
impl<Fut: Future> Default for FuturesOrdered<Fut> {
@@ -196,7 +220,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
{
let acc = Self::new();
iter.into_iter().fold(acc, |mut acc, item| {
- acc.push(item);
+ acc.push_back(item);
acc
})
}
@@ -214,7 +238,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
I: IntoIterator<Item = Fut>,
{
for item in iter {
- self.push(item);
+ self.push_back(item);
}
}
}
diff --git a/vendor/futures-util/src/stream/futures_unordered/iter.rs b/vendor/futures-util/src/stream/futures_unordered/iter.rs
index 04db5ee75..20248c70f 100644
--- a/vendor/futures-util/src/stream/futures_unordered/iter.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/iter.rs
@@ -2,6 +2,7 @@ use super::task::Task;
use super::FuturesUnordered;
use core::marker::PhantomData;
use core::pin::Pin;
+use core::ptr;
use core::sync::atomic::Ordering::Relaxed;
/// Mutable iterator over all futures in the unordered set.
@@ -58,6 +59,9 @@ impl<Fut: Unpin> Iterator for IntoIter<Fut> {
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
+ if !task.is_null() {
+ *(**task).prev_all.get() = ptr::null_mut();
+ }
self.len -= 1;
Some(future)
}
diff --git a/vendor/futures-util/src/stream/futures_unordered/mod.rs b/vendor/futures-util/src/stream/futures_unordered/mod.rs
index aab2bb446..6b5804dc4 100644
--- a/vendor/futures-util/src/stream/futures_unordered/mod.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/mod.rs
@@ -6,7 +6,6 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
@@ -23,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;
mod iter;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};
mod task;
@@ -31,35 +31,11 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
-/// Constant used for a `FuturesUnordered` to determine how many times it is
-/// allowed to poll underlying futures without yielding.
-///
-/// A single call to `poll_next` may potentially do a lot of work before
-/// yielding. This happens in particular if the underlying futures are awoken
-/// frequently but continue to return `Pending`. This is problematic if other
-/// tasks are waiting on the executor, since they do not get to run. This value
-/// caps the number of calls to `poll` on underlying futures a single call to
-/// `poll_next` is allowed to make.
-///
-/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
-/// that amortize wakeup and scheduling costs, but low enough that we do not
-/// starve other tasks for long.
-///
-/// See also https://github.com/rust-lang/futures-rs/issues/2047.
-///
-/// Note that using the length of the `FuturesUnordered` instead of this value
-/// may cause problems if the number of futures is large.
-/// See also https://github.com/rust-lang/futures-rs/pull/2527.
-///
-/// Additionally, polling the same future twice per iteration may cause another
-/// problem. So, when using this value, it is necessary to limit the max value
-/// based on the length of the `FuturesUnordered`.
-/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
-/// See also https://github.com/rust-lang/futures-rs/pull/2333.
-const YIELD_EVERY: usize = 32;
-
/// A set of futures which may complete in any order.
///
+/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this
+/// type that preserves a FIFO order.
+///
/// This structure is optimized to manage a large number of futures.
/// Futures managed by [`FuturesUnordered`] will only be polled when they
/// generate wake-up notifications. This reduces the required amount of work
@@ -149,8 +125,9 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
+ woken: AtomicBool::new(false),
});
- let stub_ptr = &*stub as *const Task<Fut>;
+ let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
waker: AtomicWaker::new(),
head: AtomicPtr::new(stub_ptr as *mut _),
@@ -195,6 +172,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
+ woken: AtomicBool::new(false),
});
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -403,7 +381,7 @@ impl<Fut> FuturesUnordered<Fut> {
// The `ReadyToRunQueue` stub is never inserted into the `head_all`
// list, and its pointer value will remain valid for the lifetime of
// this `FuturesUnordered`, so we can make use of its value here.
- &*self.ready_to_run_queue.stub as *const _ as *mut _
+ Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _
}
}
@@ -411,12 +389,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- // See YIELD_EVERY docs for more.
- let yield_every = cmp::min(self.len(), YIELD_EVERY);
+ let len = self.len();
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
+ let mut yielded = 0;
// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
@@ -527,7 +505,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
- let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
+ let task = bomb.task.as_ref().unwrap();
+ // We are only interested in whether the future is awoken before it
+ // finishes polling, so reset the flag here.
+ task.woken.store(false, Relaxed);
+ let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);
// Safety: We won't move the future ever again
@@ -540,12 +522,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
+ // If the future was awoken during polling, we assume
+ // the future wanted to explicitly yield.
+ yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);
- if polled == yield_every {
- // We have polled a large number of futures in a row without yielding.
- // To ensure we do not starve other tasks waiting on the executor,
- // we yield here, but immediately wake ourselves up to continue.
+ // If a future yields, we respect it and yield here.
+ // If all futures have been polled, we also yield here to
+ // avoid starving other tasks waiting on the executor.
+ // (polling the same future twice per iteration may cause
+ // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
+ if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
diff --git a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
index 5ef6cde83..451870532 100644
--- a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> {
}
pub(super) fn stub(&self) -> *const Task<Fut> {
- &*self.stub
+ Arc::as_ptr(&self.stub)
}
// Clear the queue of tasks.
diff --git a/vendor/futures-util/src/stream/futures_unordered/task.rs b/vendor/futures-util/src/stream/futures_unordered/task.rs
index da2cd67d9..ec2114eff 100644
--- a/vendor/futures-util/src/stream/futures_unordered/task.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/task.rs
@@ -1,6 +1,6 @@
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::sync::atomic::Ordering::{self, SeqCst};
+use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use super::abort::abort;
@@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {
// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,
+
+ // Whether the future was awoken during polling
+ // It is possible for this flag to be set to true after the polling,
+ // but it will be ignored.
+ pub(super) woken: AtomicBool,
}
// `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
None => return,
};
+ arc_self.woken.store(true, Relaxed);
+
// It's our job to enqueue this task it into the ready to run queue. To
// do this we set the `queued` flag, and if successful we then do the
// actual queueing operation, ensuring that we're only queued once.
@@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> {
// still.
let prev = arc_self.queued.swap(true, SeqCst);
if !prev {
- inner.enqueue(&**arc_self);
+ inner.enqueue(Arc::as_ptr(arc_self));
inner.waker.wake();
}
}
diff --git a/vendor/futures-util/src/stream/mod.rs b/vendor/futures-util/src/stream/mod.rs
index ec685b984..bf9506147 100644
--- a/vendor/futures-util/src/stream/mod.rs
+++ b/vendor/futures-util/src/stream/mod.rs
@@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
- Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
- Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
- Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
+ All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
+ Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
+ SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
+ Unzip, Zip,
};
#[cfg(feature = "std")]
@@ -38,7 +39,9 @@ pub use self::stream::Forward;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
-pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
+pub use self::stream::{
+ BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent,
+};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "sink")]
@@ -60,7 +63,9 @@ pub use self::try_stream::IntoAsyncRead;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
-pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
+pub use self::try_stream::{
+ TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent,
+};
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryChunks, TryChunksError};
diff --git a/vendor/futures-util/src/stream/select_all.rs b/vendor/futures-util/src/stream/select_all.rs
index 3474331ad..121b6a0e5 100644
--- a/vendor/futures-util/src/stream/select_all.rs
+++ b/vendor/futures-util/src/stream/select_all.rs
@@ -8,29 +8,24 @@ use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project_lite::pin_project;
-
use super::assert_stream;
use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture};
-pin_project! {
- /// An unbounded set of streams
- ///
- /// This "combinator" provides the ability to maintain a set of streams
- /// and drive them all to completion.
- ///
- /// Streams are pushed into this set and their realized values are
- /// yielded as they become ready. Streams will only be polled when they
- /// generate notifications. This allows to coordinate a large number of streams.
- ///
- /// Note that you can create a ready-made `SelectAll` via the
- /// `select_all` function in the `stream` module, or you can start with an
- /// empty set with the `SelectAll::new` constructor.
- #[must_use = "streams do nothing unless polled"]
- pub struct SelectAll<St> {
- #[pin]
- inner: FuturesUnordered<StreamFuture<St>>,
- }
+/// An unbounded set of streams
+///
+/// This "combinator" provides the ability to maintain a set of streams
+/// and drive them all to completion.
+///
+/// Streams are pushed into this set and their realized values are
+/// yielded as they become ready. Streams will only be polled when they
+/// generate notifications. This allows to coordinate a large number of streams.
+///
+/// Note that you can create a ready-made `SelectAll` via the
+/// `select_all` function in the `stream` module, or you can start with an
+/// empty set with the `SelectAll::new` constructor.
+#[must_use = "streams do nothing unless polled"]
+pub struct SelectAll<St> {
+ inner: FuturesUnordered<StreamFuture<St>>,
}
impl<St: Debug> Debug for SelectAll<St> {
diff --git a/vendor/futures-util/src/stream/select_with_strategy.rs b/vendor/futures-util/src/stream/select_with_strategy.rs
index bd86990cd..224d5f821 100644
--- a/vendor/futures-util/src/stream/select_with_strategy.rs
+++ b/vendor/futures-util/src/stream/select_with_strategy.rs
@@ -1,5 +1,4 @@
use super::assert_stream;
-use crate::stream::{Fuse, StreamExt};
use core::{fmt, pin::Pin};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -18,13 +17,15 @@ impl PollNext {
/// Toggle the value and return the old one.
pub fn toggle(&mut self) -> Self {
let old = *self;
+ *self = self.other();
+ old
+ }
+ fn other(&self) -> PollNext {
match self {
- PollNext::Left => *self = PollNext::Right,
- PollNext::Right => *self = PollNext::Left,
+ PollNext::Left => PollNext::Right,
+ PollNext::Right => PollNext::Left,
}
-
- old
}
}
@@ -34,14 +35,41 @@ impl Default for PollNext {
}
}
+enum InternalState {
+ Start,
+ LeftFinished,
+ RightFinished,
+ BothFinished,
+}
+
+impl InternalState {
+ fn finish(&mut self, ps: PollNext) {
+ match (&self, ps) {
+ (InternalState::Start, PollNext::Left) => {
+ *self = InternalState::LeftFinished;
+ }
+ (InternalState::Start, PollNext::Right) => {
+ *self = InternalState::RightFinished;
+ }
+ (InternalState::LeftFinished, PollNext::Right)
+ | (InternalState::RightFinished, PollNext::Left) => {
+ *self = InternalState::BothFinished;
+ }
+ _ => {}
+ }
+ }
+}
+
pin_project! {
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
#[must_use = "streams do nothing unless polled"]
+ #[project = SelectWithStrategyProj]
pub struct SelectWithStrategy<St1, St2, Clos, State> {
#[pin]
- stream1: Fuse<St1>,
+ stream1: St1,
#[pin]
- stream2: Fuse<St2>,
+ stream2: St2,
+ internal_state: InternalState,
state: State,
clos: Clos,
}
@@ -120,9 +148,10 @@ where
State: Default,
{
assert_stream::<St1::Item, _>(SelectWithStrategy {
- stream1: stream1.fuse(),
- stream2: stream2.fuse(),
+ stream1,
+ stream2,
state: Default::default(),
+ internal_state: InternalState::Start,
clos: which,
})
}
@@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Acquires a reference to the underlying streams that this combinator is
/// pulling from.
pub fn get_ref(&self) -> (&St1, &St2) {
- (self.stream1.get_ref(), self.stream2.get_ref())
+ (&self.stream1, &self.stream2)
}
/// Acquires a mutable reference to the underlying streams that this
@@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
- (self.stream1.get_mut(), self.stream2.get_mut())
+ (&mut self.stream1, &mut self.stream2)
}
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -150,7 +179,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
let this = self.project();
- (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
+ (this.stream1, this.stream2)
}
/// Consumes this combinator, returning the underlying streams.
@@ -158,7 +187,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> (St1, St2) {
- (self.stream1.into_inner(), self.stream2.into_inner())
+ (self.stream1, self.stream2)
}
}
@@ -169,47 +198,93 @@ where
Clos: FnMut(&mut State) -> PollNext,
{
fn is_terminated(&self) -> bool {
- self.stream1.is_terminated() && self.stream2.is_terminated()
+ match self.internal_state {
+ InternalState::BothFinished => true,
+ _ => false,
+ }
}
}
-impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+#[inline]
+fn poll_side<St1, St2, Clos, State>(
+ select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
+ side: PollNext,
+ cx: &mut Context<'_>,
+) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
- Clos: FnMut(&mut State) -> PollNext,
{
- type Item = St1::Item;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
- let this = self.project();
-
- match (this.clos)(this.state) {
- PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
- PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
- }
+ match side {
+ PollNext::Left => select.stream1.as_mut().poll_next(cx),
+ PollNext::Right => select.stream2.as_mut().poll_next(cx),
}
}
-fn poll_inner<St1, St2>(
- a: Pin<&mut St1>,
- b: Pin<&mut St2>,
+#[inline]
+fn poll_inner<St1, St2, Clos, State>(
+ select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
+ side: PollNext,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
- let a_done = match a.poll_next(cx) {
+ let first_done = match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
- Poll::Ready(None) => true,
+ Poll::Ready(None) => {
+ select.internal_state.finish(side);
+ true
+ }
Poll::Pending => false,
};
+ let other = side.other();
+ match poll_side(select, other, cx) {
+ Poll::Ready(None) => {
+ select.internal_state.finish(other);
+ if first_done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+ a => a,
+ }
+}
+
+impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+ Clos: FnMut(&mut State) -> PollNext,
+{
+ type Item = St1::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
+ let mut this = self.project();
- match b.poll_next(cx) {
- Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
- Poll::Ready(None) if a_done => Poll::Ready(None),
- Poll::Ready(None) | Poll::Pending => Poll::Pending,
+ match this.internal_state {
+ InternalState::Start => {
+ let next_side = (this.clos)(this.state);
+ poll_inner(&mut this, next_side, cx)
+ }
+ InternalState::LeftFinished => match this.stream2.poll_next(cx) {
+ Poll::Ready(None) => {
+ *this.internal_state = InternalState::BothFinished;
+ Poll::Ready(None)
+ }
+ a => a,
+ },
+ InternalState::RightFinished => match this.stream1.poll_next(cx) {
+ Poll::Ready(None) => {
+ *this.internal_state = InternalState::BothFinished;
+ Poll::Ready(None)
+ }
+ a => a,
+ },
+ InternalState::BothFinished => Poll::Ready(None),
+ }
}
}
diff --git a/vendor/futures-util/src/stream/stream/buffer_unordered.rs b/vendor/futures-util/src/stream/stream/buffer_unordered.rs
index d64c142b4..91b0f6bcc 100644
--- a/vendor/futures-util/src/stream/stream/buffer_unordered.rs
+++ b/vendor/futures-util/src/stream/stream/buffer_unordered.rs
@@ -41,11 +41,7 @@ where
St: Stream,
St::Item: Future,
{
- pub(super) fn new(stream: St, n: usize) -> Self
- where
- St: Stream,
- St::Item: Future,
- {
+ pub(super) fn new(stream: St, n: usize) -> Self {
Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
diff --git a/vendor/futures-util/src/stream/stream/buffered.rs b/vendor/futures-util/src/stream/stream/buffered.rs
index 6052a737b..5854eb7ea 100644
--- a/vendor/futures-util/src/stream/stream/buffered.rs
+++ b/vendor/futures-util/src/stream/stream/buffered.rs
@@ -1,4 +1,4 @@
-use crate::stream::{Fuse, FuturesOrdered, StreamExt};
+use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
@@ -64,7 +64,7 @@ where
// our queue of futures.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx) {
- Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
@@ -95,6 +95,16 @@ where
}
}
+impl<St> FusedStream for Buffered<St>
+where
+ St: Stream,
+ St::Item: Future,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_done() && self.in_progress_queue.is_terminated()
+ }
+}
+
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Buffered<S>
diff --git a/vendor/futures-util/src/stream/stream/chain.rs b/vendor/futures-util/src/stream/stream/chain.rs
index c5da35e25..36ff1e533 100644
--- a/vendor/futures-util/src/stream/stream/chain.rs
+++ b/vendor/futures-util/src/stream/stream/chain.rs
@@ -50,8 +50,9 @@ where
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item));
}
+
+ this.first.set(None);
}
- this.first.set(None);
this.second.poll_next(cx)
}
diff --git a/vendor/futures-util/src/stream/stream/chunks.rs b/vendor/futures-util/src/stream/stream/chunks.rs
index 845786999..2a71ebc6c 100644
--- a/vendor/futures-util/src/stream/stream/chunks.rs
+++ b/vendor/futures-util/src/stream/stream/chunks.rs
@@ -21,10 +21,7 @@ pin_project! {
}
}
-impl<St: Stream> Chunks<St>
-where
- St: Stream,
-{
+impl<St: Stream> Chunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
@@ -66,7 +63,7 @@ impl<St: Stream> Stream for Chunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -77,9 +74,9 @@ impl<St: Stream> Stream for Chunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/vendor/futures-util/src/stream/stream/collect.rs b/vendor/futures-util/src/stream/stream/collect.rs
index b0e81b9ce..970ac26db 100644
--- a/vendor/futures-util/src/stream/stream/collect.rs
+++ b/vendor/futures-util/src/stream/stream/collect.rs
@@ -19,7 +19,7 @@ pin_project! {
impl<St: Stream, C: Default> Collect<St, C> {
fn finish(self: Pin<&mut Self>) -> C {
- mem::replace(self.project().collection, Default::default())
+ mem::take(self.project().collection)
}
pub(super) fn new(stream: St) -> Self {
diff --git a/vendor/futures-util/src/stream/stream/filter.rs b/vendor/futures-util/src/stream/stream/filter.rs
index ccf1a5122..997fe9977 100644
--- a/vendor/futures-util/src/stream/stream/filter.rs
+++ b/vendor/futures-util/src/stream/stream/filter.rs
@@ -93,7 +93,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/filter_map.rs b/vendor/futures-util/src/stream/stream/filter_map.rs
index 02a0a4386..6b7d0070d 100644
--- a/vendor/futures-util/src/stream/stream/filter_map.rs
+++ b/vendor/futures-util/src/stream/stream/filter_map.rs
@@ -87,7 +87,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/flatten_unordered.rs b/vendor/futures-util/src/stream/stream/flatten_unordered.rs
new file mode 100644
index 000000000..44c6ace2f
--- /dev/null
+++ b/vendor/futures-util/src/stream/stream/flatten_unordered.rs
@@ -0,0 +1,536 @@
+use alloc::sync::Arc;
+use core::{
+ cell::UnsafeCell,
+ convert::identity,
+ fmt,
+ marker::PhantomData,
+ num::NonZeroUsize,
+ pin::Pin,
+ sync::atomic::{AtomicU8, Ordering},
+};
+
+use pin_project_lite::pin_project;
+
+use futures_core::{
+ future::Future,
+ ready,
+ stream::{FusedStream, Stream},
+ task::{Context, Poll, Waker},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use futures_task::{waker, ArcWake};
+
+use crate::stream::FuturesUnordered;
+
+/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+/// method.
+pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>;
+
+/// There is nothing to poll and stream isn't being polled/waking/woken at the moment.
+const NONE: u8 = 0;
+
+/// Inner streams need to be polled.
+const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
+
+/// The base stream needs to be polled.
+const NEED_TO_POLL_STREAM: u8 = 0b10;
+
+/// Both base stream and inner streams need to be polled.
+const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
+
+/// The current stream is being polled at the moment.
+const POLLING: u8 = 0b100;
+
+/// Stream is being woken at the moment.
+const WAKING: u8 = 0b1000;
+
+/// The stream was waked and will be polled.
+const WOKEN: u8 = 0b10000;
+
+/// Internal polling state of the stream.
+#[derive(Clone, Debug)]
+struct SharedPollState {
+ state: Arc<AtomicU8>,
+}
+
+impl SharedPollState {
+ /// Constructs new `SharedPollState` with the given state.
+ fn new(value: u8) -> SharedPollState {
+ SharedPollState { state: Arc::new(AtomicU8::new(value)) }
+ }
+
+ /// Attempts to start polling, returning stored state in case of success.
+ /// Returns `None` if either waker is waking at the moment.
+ fn start_polling(
+ &self,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ if value & WAKING == NONE {
+ Some(POLLING)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+ let bomb = PollStateBomb::new(self, SharedPollState::reset);
+
+ Some((value, bomb))
+ }
+
+ /// Attempts to start the waking process and performs bitwise or with the given value.
+ ///
+ /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however
+ /// state will be disjuncted with the given value.
+ fn start_waking(
+ &self,
+ to_poll: u8,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ let mut next_value = value | to_poll;
+ if value & (WOKEN | POLLING) == NONE {
+ next_value |= WAKING;
+ }
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+
+ // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already
+ if value & (WOKEN | POLLING | WAKING) == NONE {
+ let bomb = PollStateBomb::new(self, SharedPollState::stop_waking);
+
+ Some((value, bomb))
+ } else {
+ None
+ }
+ }
+
+ /// Sets current state to
+ /// - `!POLLING` allowing to use wakers
+ /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
+ /// or `will_be_woken` flag supplied
+ /// - `!WAKING` as
+ /// * Wakers called during the `POLLING` phase won't propagate their calls
+ /// * `POLLING` phase can't start if some of the wakers are active
+ /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
+ fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 {
+ self.state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
+ let mut next_value = to_poll;
+
+ value &= NEED_TO_POLL_ALL;
+ if value != NONE || will_be_woken {
+ next_value |= WOKEN;
+ }
+ next_value |= value;
+
+ Some(next_value & !POLLING & !WAKING)
+ })
+ .unwrap()
+ }
+
+ /// Toggles state to non-waking, allowing to start polling.
+ fn stop_waking(&self) -> u8 {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ let next_value = value & !WAKING | WOKEN;
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .unwrap_or_else(identity);
+
+ debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING);
+ value
+ }
+
+ /// Resets current state allowing to poll the stream and wake up wakers.
+ fn reset(&self) -> u8 {
+ self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
+ }
+}
+
+/// Used to execute some function on the given state when dropped.
+struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> {
+ state: &'a SharedPollState,
+ drop: Option<F>,
+}
+
+impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
+ /// Constructs new bomb with the given state.
+ fn new(state: &'a SharedPollState, drop: F) -> Self {
+ Self { state, drop: Some(drop) }
+ }
+
+ /// Deactivates bomb, forces it to not call provided function when dropped.
+ fn deactivate(mut self) {
+ self.drop.take();
+ }
+}
+
+impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
+ fn drop(&mut self) {
+ if let Some(drop) = self.drop.take() {
+ (drop)(self.state);
+ }
+ }
+}
+
+/// Will update state with the provided value on `wake_by_ref` call
+/// and then, if there is a need, call `inner_waker`.
+struct WrappedWaker {
+ inner_waker: UnsafeCell<Option<Waker>>,
+ poll_state: SharedPollState,
+ need_to_poll: u8,
+}
+
+unsafe impl Send for WrappedWaker {}
+unsafe impl Sync for WrappedWaker {}
+
+impl WrappedWaker {
+ /// Replaces given waker's inner_waker for polling stream/futures which will
+ /// update poll state on `wake_by_ref` call. Use only if you need several
+ /// contexts.
+ ///
+ /// ## Safety
+ ///
+ /// This function will modify waker's `inner_waker` via `UnsafeCell`, so
+ /// it should be used only during `POLLING` phase by one thread at the time.
+ unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) {
+ *self_arc.inner_waker.get() = cx.waker().clone().into();
+ }
+
+ /// Attempts to start the waking process for the waker with the given value.
+ /// If succeeded, then the stream isn't yet woken and not being polled at the moment.
+ fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ self.poll_state.start_waking(self.need_to_poll)
+ }
+}
+
+impl ArcWake for WrappedWaker {
+ fn wake_by_ref(self_arc: &Arc<Self>) {
+ if let Some((_, state_bomb)) = self_arc.start_waking() {
+ // Safety: now state is not `POLLING`
+ let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
+
+ if let Some(inner_waker) = waker_opt.clone() {
+ // Stop waking to allow polling stream
+ drop(state_bomb);
+
+ // Wake up inner waker
+ inner_waker.wake();
+ }
+ }
+ }
+}
+
+pin_project! {
+ /// Future which polls optional inner stream.
+ ///
+ /// If it's `Some`, it will attempt to call `poll_next` on it,
+ /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
+ /// or `None` in case of `Poll::Ready(None)`.
+ ///
+ /// If `poll_next` will return `Poll::Pending`, it will be forwarded to
+ /// the future and current task will be notified by waker.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ struct PollStreamFut<St> {
+ #[pin]
+ stream: Option<St>,
+ }
+}
+
+impl<St> PollStreamFut<St> {
+ /// Constructs new `PollStreamFut` using given `stream`.
+ fn new(stream: impl Into<Option<St>>) -> Self {
+ Self { stream: stream.into() }
+ }
+}
+
+impl<St: Stream + Unpin> Future for PollStreamFut<St> {
+ type Output = Option<(St::Item, PollStreamFut<St>)>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut stream = self.project().stream;
+
+ let item = if let Some(stream) = stream.as_mut().as_pin_mut() {
+ ready!(stream.poll_next(cx))
+ } else {
+ None
+ };
+ let next_item_fut = PollStreamFut::new(stream.get_mut().take());
+ let out = item.map(|item| (item, next_item_fut));
+
+ Poll::Ready(out)
+ }
+}
+
+pin_project! {
+ /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+ /// method with ability to specify flow controller.
+ #[project = FlattenUnorderedWithFlowControllerProj]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream {
+ #[pin]
+ inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
+ #[pin]
+ stream: St,
+ poll_state: SharedPollState,
+ limit: Option<NonZeroUsize>,
+ is_stream_done: bool,
+ inner_streams_waker: Arc<WrappedWaker>,
+ stream_waker: Arc<WrappedWaker>,
+ flow_controller: PhantomData<Fc>
+ }
+}
+
+impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream + fmt::Debug,
+ St::Item: Stream + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlattenUnorderedWithFlowController")
+ .field("poll_state", &self.poll_state)
+ .field("inner_streams", &self.inner_streams)
+ .field("limit", &self.limit)
+ .field("stream", &self.stream)
+ .field("is_stream_done", &self.is_stream_done)
+ .field("flow_controller", &self.flow_controller)
+ .finish()
+ }
+}
+
+impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
+{
+ pub(crate) fn new(
+ stream: St,
+ limit: Option<usize>,
+ ) -> FlattenUnorderedWithFlowController<St, Fc> {
+ let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
+
+ FlattenUnorderedWithFlowController {
+ inner_streams: FuturesUnordered::new(),
+ stream,
+ is_stream_done: false,
+ limit: limit.and_then(NonZeroUsize::new),
+ inner_streams_waker: Arc::new(WrappedWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_INNER_STREAMS,
+ }),
+ stream_waker: Arc::new(WrappedWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_STREAM,
+ }),
+ poll_state,
+ flow_controller: PhantomData,
+ }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+/// Returns the next flow step based on the received item.
+pub trait FlowController<I, O> {
+ /// Handles an item producing `FlowStep` describing the next flow step.
+ fn next_step(item: I) -> FlowStep<I, O>;
+}
+
+impl<I, O> FlowController<I, O> for () {
+ fn next_step(item: I) -> FlowStep<I, O> {
+ FlowStep::Continue(item)
+ }
+}
+
+/// Describes the next flow step.
+#[derive(Debug, Clone)]
+pub enum FlowStep<C, R> {
+ /// Just yields an item and continues standard flow.
+ Continue(C),
+ /// Immediately returns an underlying item from the function.
+ Return(R),
+}
+
+impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc>
+where
+ St: Stream,
+{
+ /// Checks if current `inner_streams` bucket size is greater than optional limit.
+ fn is_exceeded_limit(&self) -> bool {
+ self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
+ }
+}
+
+impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: FusedStream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.inner_streams.is_empty()
+ }
+}
+
+impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
+{
+ type Item = <St::Item as Stream>::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut next_item = None;
+ let mut need_to_poll_next = NONE;
+
+ let mut this = self.as_mut().project();
+
+ // Attempt to start polling, in case some waker is holding the lock, wait in loop
+ let (mut poll_state_value, state_bomb) = loop {
+ if let Some(value) = this.poll_state.start_polling() {
+ break value;
+ }
+ };
+
+ // Safety: now state is `POLLING`.
+ unsafe {
+ WrappedWaker::replace_waker(this.stream_waker, cx);
+ WrappedWaker::replace_waker(this.inner_streams_waker, cx)
+ };
+
+ if poll_state_value & NEED_TO_POLL_STREAM != NONE {
+ let mut stream_waker = None;
+
+ // Here we need to poll the base stream.
+ //
+ // To improve performance, we will attempt to place as many items as we can
+ // to the `FuturesUnordered` bucket before polling inner streams
+ loop {
+ if this.is_exceeded_limit() || *this.is_stream_done {
+ // We either exceeded the limit or the stream is exhausted
+ if !*this.is_stream_done {
+ // The stream needs to be polled in the next iteration
+ need_to_poll_next |= NEED_TO_POLL_STREAM;
+ }
+
+ break;
+ } else {
+ let mut cx = Context::from_waker(
+ stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())),
+ );
+
+ match this.stream.as_mut().poll_next(&mut cx) {
+ Poll::Ready(Some(item)) => {
+ let next_item_fut = match Fc::next_step(item) {
+ // Propagates an item immediately (the main use-case is for errors)
+ FlowStep::Return(item) => {
+ need_to_poll_next |= NEED_TO_POLL_STREAM
+ | (poll_state_value & NEED_TO_POLL_INNER_STREAMS);
+ poll_state_value &= !NEED_TO_POLL_INNER_STREAMS;
+
+ next_item = Some(item);
+
+ break;
+ }
+ // Yields an item and continues processing (normal case)
+ FlowStep::Continue(inner_stream) => {
+ PollStreamFut::new(inner_stream)
+ }
+ };
+ // Add new stream to the inner streams bucket
+ this.inner_streams.as_mut().push(next_item_fut);
+ // Inner streams must be polled afterward
+ poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(None) => {
+ // Mark the base stream as done
+ *this.is_stream_done = true;
+ }
+ Poll::Pending => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
+ let inner_streams_waker = waker(this.inner_streams_waker.clone());
+ let mut cx = Context::from_waker(&inner_streams_waker);
+
+ match this.inner_streams.as_mut().poll_next(&mut cx) {
+ Poll::Ready(Some(Some((item, next_item_fut)))) => {
+ // Push next inner stream item future to the list of inner streams futures
+ this.inner_streams.as_mut().push(next_item_fut);
+ // Take the received item
+ next_item = Some(item);
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(Some(None)) => {
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ _ => {}
+ }
+ }
+
+ // We didn't have any `poll_next` panic, so it's time to deactivate the bomb
+ state_bomb.deactivate();
+
+ // Call the waker at the end of polling if
+ let mut force_wake =
+ // we need to poll the stream and didn't reach the limit yet
+ need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
+ // or we need to poll the inner streams again
+ || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;
+
+ // Stop polling and swap the latest state
+ poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
+ // If state was changed during `POLLING` phase, we also need to manually call a waker
+ force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;
+
+ let is_done = *this.is_stream_done && this.inner_streams.is_empty();
+
+ if next_item.is_some() || is_done {
+ Poll::Ready(next_item)
+ } else {
+ if force_wake {
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream + Sink<Item>,
+{
+ type Error = St::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/vendor/futures-util/src/stream/stream/mod.rs b/vendor/futures-util/src/stream/stream/mod.rs
index 9cfcc09ba..558dc22bd 100644
--- a/vendor/futures-util/src/stream/stream/mod.rs
+++ b/vendor/futures-util/src/stream/stream/mod.rs
@@ -199,6 +199,25 @@ pub use self::buffered::Buffered;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
+pub(crate) mod flatten_unordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)]
+pub use self::flatten_unordered::FlattenUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+delegate_all!(
+ /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
+ FlatMapUnordered<St, U, F>(
+ FlattenUnordered<Map<St, F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
+ where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
+);
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
mod for_each_concurrent;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
@@ -390,9 +409,9 @@ pub trait StreamExt: Stream {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
+ /// let events = stream.filter(|x| future::ready(x % 2 == 0));
///
- /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
@@ -422,11 +441,11 @@ pub trait StreamExt: Stream {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter_map(|x| async move {
+ /// let events = stream.filter_map(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
///
- /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
@@ -754,13 +773,64 @@ pub trait StreamExt: Stream {
assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
}
+ /// Flattens a stream of streams into just one continuous stream. Polls
+ /// inner streams produced by the base stream concurrently.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::StreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(1).unwrap();
+ /// tx1.unbounded_send(2).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(3).unwrap();
+ /// tx2.unbounded_send(4).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(rx1).unwrap();
+ /// tx3.unbounded_send(rx2).unwrap();
+ /// });
+ ///
+ /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
+ /// output.sort();
+ ///
+ /// assert_eq!(output, vec![1, 2, 3, 4]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
+ where
+ Self::Item: Stream + Unpin,
+ Self: Sized,
+ {
+ assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
+ }
+
/// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
///
/// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
/// you would have to chain combinators like `.map(f).flatten()` while this
/// combinator provides ability to write `.flat_map(f)` instead of chaining.
///
- /// The provided closure which produce inner streams is executed over all elements
+ /// The provided closure which produces inner streams is executed over all elements
/// of stream as last inner stream is terminated and next stream item is available.
///
/// Note that this function consumes the stream passed into it and returns a
@@ -788,6 +858,59 @@ pub trait StreamExt: Stream {
assert_stream::<U::Item, _>(FlatMap::new(self, f))
}
+ /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
+ /// and polls them concurrently, yielding items in any order, as they made
+ /// available.
+ ///
+ /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
+ /// instead, and you need to poll all of them concurrently, you would
+ /// have to use something like `for_each_concurrent` and merge values
+ /// by hand. This combinator provides ability to collect all values
+ /// from concurrently polled streams into one stream.
+ ///
+ /// The first argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// The provided closure which produces inner streams is executed over
+ /// all elements of stream as next stream item is available and limit
+ /// of concurrently processed streams isn't exceeded.
+ ///
+ /// Note that this function consumes the stream passed into it and
+ /// returns a wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..5);
+ /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
+ /// let mut values = stream.collect::<Vec<_>>().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flat_map_unordered<U, F>(
+ self,
+ limit: impl Into<Option<usize>>,
+ f: F,
+ ) -> FlatMapUnordered<Self, U, F>
+ where
+ U: Stream + Unpin,
+ F: FnMut(Self::Item) -> U,
+ Self: Sized,
+ {
+ assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))
+ }
+
/// Combinator similar to [`StreamExt::fold`] that holds internal state
/// and produces a new stream.
///
@@ -1397,8 +1520,7 @@ pub trait StreamExt: Stream {
/// be immediately returned.
///
/// If the underlying stream ended and only a partial vector was created,
- /// it'll be returned. Additionally if an error happens from the underlying
- /// stream then the currently buffered items will be yielded.
+ /// it will be returned.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
@@ -1558,6 +1680,8 @@ pub trait StreamExt: Stream {
/// assert_eq!(total, 6);
/// # });
/// ```
+ ///
+ /// [`select!`]: crate::select
fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
where
Self: Unpin + FusedStream,
diff --git a/vendor/futures-util/src/stream/stream/peek.rs b/vendor/futures-util/src/stream/stream/peek.rs
index c72dfc366..ea3d6243f 100644
--- a/vendor/futures-util/src/stream/stream/peek.rs
+++ b/vendor/futures-util/src/stream/stream/peek.rs
@@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let peek_len = if self.peeked.is_some() { 1 } else { 0 };
+ let peek_len = usize::from(self.peeked.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(peek_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/stream/ready_chunks.rs b/vendor/futures-util/src/stream/stream/ready_chunks.rs
index 5ebc9582d..192054c4a 100644
--- a/vendor/futures-util/src/stream/stream/ready_chunks.rs
+++ b/vendor/futures-util/src/stream/stream/ready_chunks.rs
@@ -1,6 +1,5 @@
-use crate::stream::Fuse;
+use crate::stream::{Fuse, StreamExt};
use alloc::vec::Vec;
-use core::mem;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -15,23 +14,15 @@ pin_project! {
pub struct ReadyChunks<St: Stream> {
#[pin]
stream: Fuse<St>,
- items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}
-impl<St: Stream> ReadyChunks<St>
-where
- St: Stream,
-{
+impl<St: Stream> ReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
- Self {
- stream: super::Fuse::new(stream),
- items: Vec::with_capacity(capacity),
- cap: capacity,
- }
+ Self { stream: stream.fuse(), cap: capacity }
}
delegate_access_inner!(stream, St, (.));
@@ -43,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
+ let mut items: Vec<St::Item> = Vec::new();
+
loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
- return if this.items.is_empty() {
- Poll::Pending
- } else {
- Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
- }
+ return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) }
}
// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
- this.items.push(item);
- if this.items.len() >= *this.cap {
- return Poll::Ready(Some(mem::replace(
- this.items,
- Vec::with_capacity(*this.cap),
- )));
+ if items.is_empty() {
+ items.reserve(*this.cap);
+ }
+ items.push(item);
+ if items.len() >= *this.cap {
+ return Poll::Ready(Some(items));
}
}
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
- let last = if this.items.is_empty() {
- None
- } else {
- let full_buf = mem::replace(this.items, Vec::new());
- Some(full_buf)
- };
+ let last = if items.is_empty() { None } else { Some(items) };
return Poll::Ready(last);
}
@@ -85,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
- let upper = match upper {
- Some(x) => x.checked_add(chunk_len),
- None => None,
- };
+ let lower = lower / self.cap;
(lower, upper)
}
}
-impl<St: FusedStream> FusedStream for ReadyChunks<St> {
+impl<St: Stream> FusedStream for ReadyChunks<St> {
fn is_terminated(&self) -> bool {
- self.stream.is_terminated() && self.items.is_empty()
+ self.stream.is_terminated()
}
}
diff --git a/vendor/futures-util/src/stream/stream/skip_while.rs b/vendor/futures-util/src/stream/stream/skip_while.rs
index 50a21a21a..dabd5eefa 100644
--- a/vendor/futures-util/src/stream/stream/skip_while.rs
+++ b/vendor/futures-util/src/stream/stream/skip_while.rs
@@ -99,7 +99,7 @@ where
if self.done_skipping {
self.stream.size_hint()
} else {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/split.rs b/vendor/futures-util/src/stream/stream/split.rs
index 3a72fee30..e2034e0c2 100644
--- a/vendor/futures-util/src/stream/stream/split.rs
+++ b/vendor/futures-util/src/stream/stream/split.rs
@@ -35,7 +35,7 @@ impl<S: Stream> Stream for SplitStream<S> {
}
}
-#[allow(bad_style)]
+#[allow(non_snake_case)]
fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
SplitSink { lock, slot: None }
}
diff --git a/vendor/futures-util/src/stream/stream/take.rs b/vendor/futures-util/src/stream/stream/take.rs
index b1c728e33..29d6c39ee 100644
--- a/vendor/futures-util/src/stream/stream/take.rs
+++ b/vendor/futures-util/src/stream/stream/take.rs
@@ -54,11 +54,11 @@ where
let (lower, upper) = self.stream.size_hint();
- let lower = cmp::min(lower, self.remaining as usize);
+ let lower = cmp::min(lower, self.remaining);
let upper = match upper {
- Some(x) if x < self.remaining as usize => Some(x),
- _ => Some(self.remaining as usize),
+ Some(x) if x < self.remaining => Some(x),
+ _ => Some(self.remaining),
};
(lower, upper)
diff --git a/vendor/futures-util/src/stream/stream/take_while.rs b/vendor/futures-util/src/stream/stream/take_while.rs
index 01b27654b..925694301 100644
--- a/vendor/futures-util/src/stream/stream/take_while.rs
+++ b/vendor/futures-util/src/stream/stream/take_while.rs
@@ -91,7 +91,7 @@ where
return (0, Some(0));
}
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/then.rs b/vendor/futures-util/src/stream/stream/then.rs
index d4531d4b9..9192c0b0c 100644
--- a/vendor/futures-util/src/stream/stream/then.rs
+++ b/vendor/futures-util/src/stream/stream/then.rs
@@ -78,7 +78,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/stream/unzip.rs b/vendor/futures-util/src/stream/stream/unzip.rs
index 15f22e80b..a88cf0326 100644
--- a/vendor/futures-util/src/stream/stream/unzip.rs
+++ b/vendor/futures-util/src/stream/stream/unzip.rs
@@ -21,7 +21,7 @@ pin_project! {
impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
let this = self.project();
- (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default()))
+ (mem::take(this.left), mem::take(this.right))
}
pub(super) fn new(stream: St) -> Self {
diff --git a/vendor/futures-util/src/stream/stream/zip.rs b/vendor/futures-util/src/stream/stream/zip.rs
index 360a8b63b..25a47e96b 100644
--- a/vendor/futures-util/src/stream/stream/zip.rs
+++ b/vendor/futures-util/src/stream/stream/zip.rs
@@ -102,8 +102,8 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let queued1_len = if self.queued1.is_some() { 1 } else { 0 };
- let queued2_len = if self.queued2.is_some() { 1 } else { 0 };
+ let queued1_len = usize::from(self.queued1.is_some());
+ let queued2_len = usize::from(self.queued2.is_some());
let (stream1_lower, stream1_upper) = self.stream1.size_hint();
let (stream2_lower, stream2_upper) = self.stream2.size_hint();
diff --git a/vendor/futures-util/src/stream/try_stream/and_then.rs b/vendor/futures-util/src/stream/try_stream/and_then.rs
index a7b50db0b..2f8b6f258 100644
--- a/vendor/futures-util/src/stream/try_stream/and_then.rs
+++ b/vendor/futures-util/src/stream/try_stream/and_then.rs
@@ -71,7 +71,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/try_stream/into_async_read.rs b/vendor/futures-util/src/stream/try_stream/into_async_read.rs
index 914b277a0..ffbfc7eae 100644
--- a/vendor/futures-util/src/stream/try_stream/into_async_read.rs
+++ b/vendor/futures-util/src/stream/try_stream/into_async_read.rs
@@ -1,30 +1,26 @@
-use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use pin_project_lite::pin_project;
use std::cmp;
use std::io::{Error, Result};
-/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
-#[derive(Debug)]
-#[must_use = "readers do nothing unless polled"]
-#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
-pub struct IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
- stream: St,
- state: ReadState<St::Ok>,
-}
-
-impl<St> Unpin for IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
+pin_project! {
+ /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless polled"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+ pub struct IntoAsyncRead<St>
+ where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+ {
+ #[pin]
+ stream: St,
+ state: ReadState<St::Ok>,
+ }
}
#[derive(Debug)]
@@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {
impl<St> IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
@@ -46,16 +42,18 @@ where
impl<St> AsyncRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_read(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
+ let mut this = self.project();
+
loop {
- match &mut self.state {
+ match this.state {
ReadState::Ready { chunk, chunk_start } => {
let chunk = chunk.as_ref();
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
@@ -64,23 +62,23 @@ where
*chunk_start += len;
if chunk.len() == *chunk_start {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
return Poll::Ready(Ok(len));
}
- ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
+ ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(0));
}
},
@@ -94,51 +92,52 @@ where
impl<St> AsyncWrite for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + AsyncWrite + Unpin,
+ St: TryStream<Error = Error> + AsyncWrite,
St::Ok: AsRef<[u8]>,
{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize>> {
- Pin::new(&mut self.stream).poll_write(cx, buf)
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let this = self.project();
+ this.stream.poll_write(cx, buf)
}
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_flush(cx)
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_flush(cx)
}
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_close(cx)
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_close(cx)
}
}
impl<St> AsyncBufRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- while let ReadState::PendingChunk = self.state {
- match ready!(self.stream.try_poll_next_unpin(cx)) {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
+ let mut this = self.project();
+
+ while let ReadState::PendingChunk = this.state {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}
- if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
+ if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}
@@ -147,16 +146,18 @@ where
Poll::Ready(Ok(&[]))
}
- fn consume(mut self: Pin<&mut Self>, amount: usize) {
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ let this = self.project();
+
// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
if amount == 0 {
return;
}
- if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
+ if let ReadState::Ready { chunk, chunk_start } = this.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
diff --git a/vendor/futures-util/src/stream/try_stream/mod.rs b/vendor/futures-util/src/stream/try_stream/mod.rs
index 455ddca3f..414a40dbe 100644
--- a/vendor/futures-util/src/stream/try_stream/mod.rs
+++ b/vendor/futures-util/src/stream/try_stream/mod.rs
@@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
+
use futures_core::{
future::{Future, TryFuture},
stream::TryStream,
@@ -88,6 +89,14 @@ mod try_flatten;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_flatten::TryFlatten;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_flatten_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_flatten_unordered::TryFlattenUnordered;
+
mod try_collect;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_collect::TryCollect;
@@ -711,6 +720,63 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
}
+ /// Flattens a stream of streams into just one continuous stream. Produced streams
+ /// will be polled concurrently and any errors will be passed through without looking at them.
+ /// If the underlying base stream returns an error, it will be **immediately** propagated.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(Ok(1)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(Ok(2)).unwrap();
+ /// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(Ok(rx1)).unwrap();
+ /// tx3.unbounded_send(Ok(rx2)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
+ /// });
+ ///
+ /// let stream = rx3.try_flatten_unordered(None);
+ /// let mut values: Vec<_> = stream.collect().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
+ where
+ Self::Ok: TryStream + Unpin,
+ <Self::Ok as TryStream>::Error: From<Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+ TryFlattenUnordered::new(self, limit),
+ )
+ }
+
/// Flattens a stream of streams into just one continuous stream.
///
/// If this stream's elements are themselves streams then this combinator
@@ -736,17 +802,21 @@ pub trait TryStreamExt: TryStream {
/// thread::spawn(move || {
/// tx2.unbounded_send(Ok(2)).unwrap();
/// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
/// });
/// thread::spawn(move || {
/// tx3.unbounded_send(Ok(rx1)).unwrap();
/// tx3.unbounded_send(Ok(rx2)).unwrap();
- /// tx3.unbounded_send(Err(4)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
/// });
///
/// let mut stream = rx3.try_flatten();
/// assert_eq!(stream.next().await, Some(Ok(1)));
/// assert_eq!(stream.next().await, Some(Ok(2)));
/// assert_eq!(stream.next().await, Some(Err(3)));
+ /// assert_eq!(stream.next().await, Some(Ok(4)));
+ /// assert_eq!(stream.next().await, Some(Err(5)));
+ /// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn try_flatten(self) -> TryFlatten<Self>
@@ -914,7 +984,7 @@ pub trait TryStreamExt: TryStream {
/// that matches the stream's `Error` type.
///
/// This adaptor will buffer up to `n` futures and then return their
- /// outputs in the order. If the underlying stream returns an error, it will
+ /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
/// be immediately propagated.
///
/// The returned stream will be a stream of results, each containing either
@@ -1001,6 +1071,7 @@ pub trait TryStreamExt: TryStream {
/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::future::{FutureExt, TryFutureExt};
/// # let (tx, rx) = futures::channel::oneshot::channel();
///
@@ -1026,12 +1097,7 @@ pub trait TryStreamExt: TryStream {
Compat::new(self)
}
- /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
- ///
- /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
- /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
- /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
- /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
+ /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
///
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
@@ -1043,12 +1109,12 @@ pub trait TryStreamExt: TryStream {
/// use futures::stream::{self, TryStreamExt};
/// use futures::io::AsyncReadExt;
///
- /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
+ /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
/// let mut reader = stream.into_async_read();
- /// let mut buf = Vec::new();
///
- /// assert!(reader.read_to_end(&mut buf).await.is_ok());
- /// assert_eq!(buf, &[1, 2, 3, 4, 5]);
+ /// let mut buf = Vec::new();
+ /// reader.read_to_end(&mut buf).await.unwrap();
+ /// assert_eq!(buf, [1, 2, 3, 4, 5]);
/// # })
/// ```
#[cfg(feature = "io")]
@@ -1056,7 +1122,7 @@ pub trait TryStreamExt: TryStream {
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
- Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
+ Self: Sized + TryStreamExt<Error = std::io::Error>,
Self::Ok: AsRef<[u8]>,
{
crate::io::assert_read(IntoAsyncRead::new(self))
diff --git a/vendor/futures-util/src/stream/try_stream/or_else.rs b/vendor/futures-util/src/stream/try_stream/or_else.rs
index cb69e8132..53aceb8e6 100644
--- a/vendor/futures-util/src/stream/try_stream/or_else.rs
+++ b/vendor/futures-util/src/stream/try_stream/or_else.rs
@@ -75,7 +75,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/try_stream/try_buffered.rs b/vendor/futures-util/src/stream/try_stream/try_buffered.rs
index 45bd3f8c7..9f48e5c0a 100644
--- a/vendor/futures-util/src/stream/try_stream/try_buffered.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_buffered.rs
@@ -54,7 +54,7 @@ where
// our queue of futures. Propagate errors from the stream immediately.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx)? {
- Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
diff --git a/vendor/futures-util/src/stream/try_stream/try_chunks.rs b/vendor/futures-util/src/stream/try_stream/try_chunks.rs
index 07d4425a8..ec53f4bd1 100644
--- a/vendor/futures-util/src/stream/try_stream/try_chunks.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_chunks.rs
@@ -41,9 +41,10 @@ impl<St: TryStream> TryChunks<St> {
delegate_access_inner!(stream, St, (. .));
}
+type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
+
impl<St: TryStream> Stream for TryChunks<St> {
- #[allow(clippy::type_complexity)]
- type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
+ type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
@@ -70,7 +71,7 @@ impl<St: TryStream> Stream for TryChunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -81,9 +82,9 @@ impl<St: TryStream> Stream for TryChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/vendor/futures-util/src/stream/try_stream/try_collect.rs b/vendor/futures-util/src/stream/try_stream/try_collect.rs
index 5d3b3d766..3e5963f03 100644
--- a/vendor/futures-util/src/stream/try_stream/try_collect.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_collect.rs
@@ -45,7 +45,7 @@ where
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().try_poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
- None => break mem::replace(this.items, Default::default()),
+ None => break mem::take(this.items),
}
}))
}
diff --git a/vendor/futures-util/src/stream/try_stream/try_filter.rs b/vendor/futures-util/src/stream/try_stream/try_filter.rs
index 61e6105c3..11d58243f 100644
--- a/vendor/futures-util/src/stream/try_stream/try_filter.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_filter.rs
@@ -90,7 +90,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_fut.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_fut.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs
index bb1b5b9db..ed1201732 100644
--- a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs
@@ -84,7 +84,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
new file mode 100644
index 000000000..a74dfc451
--- /dev/null
+++ b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
@@ -0,0 +1,176 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+use pin_project_lite::pin_project;
+
+use crate::future::Either;
+use crate::stream::stream::flatten_unordered::{
+ FlattenUnorderedWithFlowController, FlowController, FlowStep,
+};
+use crate::stream::IntoStream;
+use crate::TryStreamExt;
+
+delegate_all!(
+ /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
+ TryFlattenUnordered<St>(
+ FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ + New[
+ |stream: St, limit: impl Into<Option<usize>>|
+ FlattenUnorderedWithFlowController::new(
+ NestedTryStreamIntoEitherTryStream::new(stream),
+ limit.into()
+ )
+ ]
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+);
+
+pin_project! {
+ /// Emits either successful streams or single-item streams containing the underlying errors.
+ /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct NestedTryStreamIntoEitherTryStream<St>
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+ {
+ #[pin]
+ stream: St
+ }
+}
+
+impl<St> NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn new(stream: St) -> Self {
+ Self { stream }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+/// Emits a single item immediately, then stream will be terminated.
+#[derive(Debug, Clone)]
+pub struct Single<T>(Option<T>);
+
+impl<T> Single<T> {
+ /// Constructs new `Single` with the given value.
+ fn new(val: T) -> Self {
+ Self(Some(val))
+ }
+
+ /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
+ fn next_immediate(&mut self) -> Option<T> {
+ self.0.take()
+ }
+}
+
+impl<T> Unpin for Single<T> {}
+
+impl<T> Stream for Single<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(self.0.take())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
+ }
+}
+
+/// Immediately propagates errors occurred in the base stream.
+#[derive(Debug, Clone, Copy)]
+pub struct PropagateBaseStreamError<St>(PhantomData<St>);
+
+type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
+type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;
+
+impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
+ match item {
+ // A new successful inner stream received
+ st @ Either::Left(_) => FlowStep::Continue(st),
+ // An error encountered
+ Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
+ }
+ }
+}
+
+type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
+
+impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ // Item is either an inner stream or a stream containing a single error.
+ // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
+ type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let item = ready!(self.project().stream.try_poll_next(cx));
+
+ let out = match item {
+ Some(res) => match res {
+ // Emit successful inner stream as is
+ Ok(stream) => Either::Left(stream.into_stream()),
+ // Wrap an error into a stream containing a single item
+ err @ Err(_) => {
+ let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
+
+ Either::Right(Single::new(res))
+ }
+ },
+ None => return Poll::Ready(None),
+ };
+
+ Poll::Ready(Some(out))
+ }
+}
+
+impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + FusedStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + Sink<Item>,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
+{
+ type Error = <St as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs
index a424b6c5b..52aa2d478 100644
--- a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs
@@ -87,7 +87,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/try_stream/try_take_while.rs b/vendor/futures-util/src/stream/try_stream/try_take_while.rs
index 3375960ef..4b5ff1ad3 100644
--- a/vendor/futures-util/src/stream/try_stream/try_take_while.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_take_while.rs
@@ -96,7 +96,7 @@ where
return (0, Some(0));
}
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),