summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/compat/compat03as01.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-util/src/compat/compat03as01.rs')
-rw-r--r--third_party/rust/futures-util/src/compat/compat03as01.rs265
1 files changed, 265 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/compat/compat03as01.rs b/third_party/rust/futures-util/src/compat/compat03as01.rs
new file mode 100644
index 0000000000..5d3a6e920b
--- /dev/null
+++ b/third_party/rust/futures-util/src/compat/compat03as01.rs
@@ -0,0 +1,265 @@
+use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
+use futures_01::{
+ task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
+};
+#[cfg(feature = "sink")]
+use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
+use futures_core::{
+ future::TryFuture as TryFuture03,
+ stream::TryStream as TryStream03,
+ task::{RawWaker, RawWakerVTable},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink as Sink03;
+#[cfg(feature = "sink")]
+use std::marker::PhantomData;
+use std::{mem, pin::Pin, sync::Arc, task::Context};
+
+/// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
+/// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
+/// [`Future`](futures_01::future::Future) or
+/// [`Stream`](futures_01::stream::Stream).
+#[derive(Debug, Clone, Copy)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Compat<T> {
+ pub(crate) inner: T,
+}
+
+/// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
+/// [`Sink`](futures_01::sink::Sink).
+#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct CompatSink<T, Item> {
+ inner: T,
+ _phantom: PhantomData<fn(Item)>,
+}
+
+impl<T> Compat<T> {
+ /// Creates a new [`Compat`].
+ ///
+ /// For types which implement appropriate futures `0.3`
+ /// traits, the result will be a type which implements
+ /// the corresponding futures 0.1 type.
+ pub fn new(inner: T) -> Self {
+ Self { inner }
+ }
+
+ /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
+ /// contained within.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
+ /// contained within.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Returns the inner item.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<T, Item> CompatSink<T, Item> {
+ /// Creates a new [`CompatSink`].
+ pub fn new(inner: T) -> Self {
+ Self { inner, _phantom: PhantomData }
+ }
+
+ /// Get a reference to 0.3 Sink contained within.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Get a mutable reference to 0.3 Sink contained within.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Returns the inner item.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
+ match x? {
+ task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
+ task03::Poll::Pending => Ok(Async01::NotReady),
+ }
+}
+
+impl<Fut> Future01 for Compat<Fut>
+where
+ Fut: TryFuture03 + Unpin,
+{
+ type Item = Fut::Ok;
+ type Error = Fut::Error;
+
+ fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
+ with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
+ }
+}
+
+impl<St> Stream01 for Compat<St>
+where
+ St: TryStream03 + Unpin,
+{
+ type Item = St::Ok;
+ type Error = St::Error;
+
+ fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
+ with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
+ task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
+ task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
+ task03::Poll::Pending => Ok(Async01::NotReady),
+ })
+ }
+}
+
+#[cfg(feature = "sink")]
+impl<T, Item> Sink01 for CompatSink<T, Item>
+where
+ T: Sink03<Item> + Unpin,
+{
+ type SinkItem = Item;
+ type SinkError = T::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
+ with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
+ task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
+ task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
+ })
+ }
+
+ fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
+ with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
+ }
+
+ fn close(&mut self) -> Poll01<(), Self::SinkError> {
+ with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
+ }
+}
+
+#[derive(Clone)]
+struct Current(task01::Task);
+
+impl Current {
+ fn new() -> Self {
+ Self(task01::current())
+ }
+
+ fn as_waker(&self) -> WakerRef<'_> {
+ unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
+ &*(ptr as *const Current)
+ }
+ fn current_to_ptr(current: &Current) -> *const () {
+ current as *const Current as *const ()
+ }
+
+ unsafe fn clone(ptr: *const ()) -> RawWaker {
+ // Lazily create the `Arc` only when the waker is actually cloned.
+ // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
+ // function is landed in `core`.
+ mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
+ ptr_to_current(ptr).clone(),
+ )))
+ }
+ unsafe fn drop(_: *const ()) {}
+ unsafe fn wake(ptr: *const ()) {
+ ptr_to_current(ptr).0.notify()
+ }
+
+ let ptr = current_to_ptr(self);
+ let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
+ WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
+ task03::Waker::from_raw(RawWaker::new(ptr, vtable))
+ }))
+ }
+}
+
+impl ArcWake03 for Current {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.0.notify();
+ }
+}
+
+fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
+where
+ T: Unpin,
+ F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
+{
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ f(Pin::new(&mut compat.inner), &mut cx)
+}
+
+#[cfg(feature = "sink")]
+fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
+where
+ T: Unpin,
+ F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
+{
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ f(Pin::new(&mut compat.inner), &mut cx)
+}
+
+#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+mod io {
+ use super::*;
+ use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
+ use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
+
+ fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
+ match x {
+ task03::Poll::Ready(Ok(t)) => Ok(t),
+ task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
+ task03::Poll::Ready(Err(e)) => Err(e),
+ }
+ }
+
+ impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
+ }
+ }
+
+ impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
+
+ impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
+ }
+ }
+
+ impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
+ fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
+ let current = Current::new();
+ let waker = current.as_waker();
+ let mut cx = Context::from_waker(&waker);
+ poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
+ }
+ }
+}