summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/fs/file.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/fs/file.rs')
-rw-r--r--third_party/rust/tokio/src/fs/file.rs779
1 files changed, 779 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/fs/file.rs b/third_party/rust/tokio/src/fs/file.rs
new file mode 100644
index 0000000000..2c38e8059f
--- /dev/null
+++ b/third_party/rust/tokio/src/fs/file.rs
@@ -0,0 +1,779 @@
+//! Types for working with [`File`].
+//!
+//! [`File`]: File
+
+use self::State::*;
+use crate::fs::asyncify;
+use crate::io::blocking::Buf;
+use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
+use crate::sync::Mutex;
+
+use std::fmt;
+use std::fs::{Metadata, Permissions};
+use std::future::Future;
+use std::io::{self, Seek, SeekFrom};
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use std::task::Poll::*;
+
+#[cfg(test)]
+use super::mocks::JoinHandle;
+#[cfg(test)]
+use super::mocks::MockFile as StdFile;
+#[cfg(test)]
+use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
+#[cfg(not(test))]
+use crate::blocking::JoinHandle;
+#[cfg(not(test))]
+use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
+#[cfg(not(test))]
+use std::fs::File as StdFile;
+
+/// A reference to an open file on the filesystem.
+///
+/// This is a specialized version of [`std::fs::File`][std] for usage from the
+/// Tokio runtime.
+///
+/// An instance of a `File` can be read and/or written depending on what options
+/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
+/// cursor that the file contains internally.
+///
+/// A file will not be closed immediately when it goes out of scope if there
+/// are any IO operations that have not yet completed. To ensure that a file is
+/// closed immediately when it is dropped, you should call [`flush`] before
+/// dropping it. Note that this does not ensure that the file has been fully
+/// written to disk; the operating system might keep the changes around in an
+/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
+/// the data to disk.
+///
+/// Reading and writing to a `File` is usually done using the convenience
+/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
+///
+/// [std]: struct@std::fs::File
+/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
+/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
+/// [`sync_all`]: fn@crate::fs::File::sync_all
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+///
+/// # Examples
+///
+/// Create a new file and asynchronously write bytes to it:
+///
+/// ```no_run
+/// use tokio::fs::File;
+/// use tokio::io::AsyncWriteExt; // for write_all()
+///
+/// # async fn dox() -> std::io::Result<()> {
+/// let mut file = File::create("foo.txt").await?;
+/// file.write_all(b"hello, world!").await?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Read the contents of a file into a buffer:
+///
+/// ```no_run
+/// use tokio::fs::File;
+/// use tokio::io::AsyncReadExt; // for read_to_end()
+///
+/// # async fn dox() -> std::io::Result<()> {
+/// let mut file = File::open("foo.txt").await?;
+///
+/// let mut contents = vec![];
+/// file.read_to_end(&mut contents).await?;
+///
+/// println!("len = {}", contents.len());
+/// # Ok(())
+/// # }
+/// ```
+pub struct File {
+ std: Arc<StdFile>,
+ inner: Mutex<Inner>,
+}
+
+struct Inner {
+ state: State,
+
+ /// Errors from writes/flushes are returned in write/flush calls. If a write
+ /// error is observed while performing a read, it is saved until the next
+ /// write / flush call.
+ last_write_err: Option<io::ErrorKind>,
+
+ pos: u64,
+}
+
+#[derive(Debug)]
+enum State {
+ Idle(Option<Buf>),
+ Busy(JoinHandle<(Operation, Buf)>),
+}
+
+#[derive(Debug)]
+enum Operation {
+ Read(io::Result<usize>),
+ Write(io::Result<()>),
+ Seek(io::Result<u64>),
+}
+
+impl File {
+ /// Attempts to open a file in read-only mode.
+ ///
+ /// See [`OpenOptions`] for more details.
+ ///
+ /// [`OpenOptions`]: super::OpenOptions
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error if called from outside of the Tokio
+ /// runtime or if path does not already exist. Other errors may also be
+ /// returned according to OpenOptions::open.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::io::AsyncReadExt;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::open("foo.txt").await?;
+ ///
+ /// let mut contents = vec![];
+ /// file.read_to_end(&mut contents).await?;
+ ///
+ /// println!("len = {}", contents.len());
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
+ ///
+ /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
+ /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+ pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
+ let path = path.as_ref().to_owned();
+ let std = asyncify(|| StdFile::open(path)).await?;
+
+ Ok(File::from_std(std))
+ }
+
+ /// Opens a file in write-only mode.
+ ///
+ /// This function will create a file if it does not exist, and will truncate
+ /// it if it does.
+ ///
+ /// See [`OpenOptions`] for more details.
+ ///
+ /// [`OpenOptions`]: super::OpenOptions
+ ///
+ /// # Errors
+ ///
+ /// Results in an error if called from outside of the Tokio runtime or if
+ /// the underlying [`create`] call results in an error.
+ ///
+ /// [`create`]: std::fs::File::create
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::io::AsyncWriteExt;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
+ let path = path.as_ref().to_owned();
+ let std_file = asyncify(move || StdFile::create(path)).await?;
+ Ok(File::from_std(std_file))
+ }
+
+ /// Converts a [`std::fs::File`][std] to a [`tokio::fs::File`][file].
+ ///
+ /// [std]: std::fs::File
+ /// [file]: File
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// // This line could block. It is not recommended to do this on the Tokio
+ /// // runtime.
+ /// let std_file = std::fs::File::open("foo.txt").unwrap();
+ /// let file = tokio::fs::File::from_std(std_file);
+ /// ```
+ pub fn from_std(std: StdFile) -> File {
+ File {
+ std: Arc::new(std),
+ inner: Mutex::new(Inner {
+ state: State::Idle(Some(Buf::with_capacity(0))),
+ last_write_err: None,
+ pos: 0,
+ }),
+ }
+ }
+
+ /// Attempts to sync all OS-internal metadata to disk.
+ ///
+ /// This function will attempt to ensure that all in-core data reaches the
+ /// filesystem before returning.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::io::AsyncWriteExt;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// file.sync_all().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn sync_all(&self) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ inner.complete_inflight().await;
+
+ let std = self.std.clone();
+ asyncify(move || std.sync_all()).await
+ }
+
+ /// This function is similar to `sync_all`, except that it may not
+ /// synchronize file metadata to the filesystem.
+ ///
+ /// This is intended for use cases that must synchronize content, but don't
+ /// need the metadata on disk. The goal of this method is to reduce disk
+ /// operations.
+ ///
+ /// Note that some platforms may simply implement this in terms of `sync_all`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::io::AsyncWriteExt;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// file.sync_data().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn sync_data(&self) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ inner.complete_inflight().await;
+
+ let std = self.std.clone();
+ asyncify(move || std.sync_data()).await
+ }
+
+ /// Truncates or extends the underlying file, updating the size of this file to become size.
+ ///
+ /// If the size is less than the current file's size, then the file will be
+ /// shrunk. If it is greater than the current file's size, then the file
+ /// will be extended to size and have all of the intermediate data filled in
+ /// with 0s.
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error if the file is not opened for
+ /// writing.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::io::AsyncWriteExt;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// file.set_len(10).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn set_len(&self, size: u64) -> io::Result<()> {
+ let mut inner = self.inner.lock().await;
+ inner.complete_inflight().await;
+
+ let mut buf = match inner.state {
+ Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
+ _ => unreachable!(),
+ };
+
+ let seek = if !buf.is_empty() {
+ Some(SeekFrom::Current(buf.discard_read()))
+ } else {
+ None
+ };
+
+ let std = self.std.clone();
+
+ inner.state = Busy(spawn_blocking(move || {
+ let res = if let Some(seek) = seek {
+ (&*std).seek(seek).and_then(|_| std.set_len(size))
+ } else {
+ std.set_len(size)
+ }
+ .map(|_| 0); // the value is discarded later
+
+ // Return the result as a seek
+ (Operation::Seek(res), buf)
+ }));
+
+ let (op, buf) = match inner.state {
+ Idle(_) => unreachable!(),
+ Busy(ref mut rx) => rx.await?,
+ };
+
+ inner.state = Idle(Some(buf));
+
+ match op {
+ Operation::Seek(res) => res.map(|pos| {
+ inner.pos = pos;
+ }),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Queries metadata about the underlying file.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let file = File::open("foo.txt").await?;
+ /// let metadata = file.metadata().await?;
+ ///
+ /// println!("{:?}", metadata);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn metadata(&self) -> io::Result<Metadata> {
+ let std = self.std.clone();
+ asyncify(move || std.metadata()).await
+ }
+
+ /// Creates a new `File` instance that shares the same underlying file handle
+ /// as the existing `File` instance. Reads, writes, and seeks will affect both
+ /// File instances simultaneously.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let file = File::open("foo.txt").await?;
+ /// let file_clone = file.try_clone().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn try_clone(&self) -> io::Result<File> {
+ let std = self.std.clone();
+ let std_file = asyncify(move || std.try_clone()).await?;
+ Ok(File::from_std(std_file))
+ }
+
+ /// Destructures `File` into a [`std::fs::File`][std]. This function is
+ /// async to allow any in-flight operations to complete.
+ ///
+ /// Use `File::try_into_std` to attempt conversion immediately.
+ ///
+ /// [std]: std::fs::File
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let tokio_file = File::open("foo.txt").await?;
+ /// let std_file = tokio_file.into_std().await;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn into_std(mut self) -> StdFile {
+ self.inner.get_mut().complete_inflight().await;
+ Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
+ }
+
+ /// Tries to immediately destructure `File` into a [`std::fs::File`][std].
+ ///
+ /// [std]: std::fs::File
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error containing the file if some
+ /// operation is in-flight.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let tokio_file = File::open("foo.txt").await?;
+ /// let std_file = tokio_file.try_into_std().unwrap();
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_into_std(mut self) -> Result<StdFile, Self> {
+ match Arc::try_unwrap(self.std) {
+ Ok(file) => Ok(file),
+ Err(std_file_arc) => {
+ self.std = std_file_arc;
+ Err(self)
+ }
+ }
+ }
+
+ /// Changes the permissions on the underlying file.
+ ///
+ /// # Platform-specific behavior
+ ///
+ /// This function currently corresponds to the `fchmod` function on Unix and
+ /// the `SetFileInformationByHandle` function on Windows. Note that, this
+ /// [may change in the future][changes].
+ ///
+ /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error if the user lacks permission change
+ /// attributes on the underlying file. It may also return an error in other
+ /// os-specific unspecified cases.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let file = File::open("foo.txt").await?;
+ /// let mut perms = file.metadata().await?.permissions();
+ /// perms.set_readonly(true);
+ /// file.set_permissions(perms).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
+ let std = self.std.clone();
+ asyncify(move || std.set_permissions(perm)).await
+ }
+}
+
+impl AsyncRead for File {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ dst: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ let me = self.get_mut();
+ let inner = me.inner.get_mut();
+
+ loop {
+ match inner.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ if !buf.is_empty() {
+ buf.copy_to(dst);
+ *buf_cell = Some(buf);
+ return Ready(Ok(()));
+ }
+
+ buf.ensure_capacity_for(dst);
+ let std = me.std.clone();
+
+ inner.state = Busy(spawn_blocking(move || {
+ let res = buf.read_from(&mut &*std);
+ (Operation::Read(res), buf)
+ }));
+ }
+ Busy(ref mut rx) => {
+ let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
+
+ match op {
+ Operation::Read(Ok(_)) => {
+ buf.copy_to(dst);
+ inner.state = Idle(Some(buf));
+ return Ready(Ok(()));
+ }
+ Operation::Read(Err(e)) => {
+ assert!(buf.is_empty());
+
+ inner.state = Idle(Some(buf));
+ return Ready(Err(e));
+ }
+ Operation::Write(Ok(_)) => {
+ assert!(buf.is_empty());
+ inner.state = Idle(Some(buf));
+ continue;
+ }
+ Operation::Write(Err(e)) => {
+ assert!(inner.last_write_err.is_none());
+ inner.last_write_err = Some(e.kind());
+ inner.state = Idle(Some(buf));
+ }
+ Operation::Seek(result) => {
+ assert!(buf.is_empty());
+ inner.state = Idle(Some(buf));
+ if let Ok(pos) = result {
+ inner.pos = pos;
+ }
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+impl AsyncSeek for File {
+ fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
+ let me = self.get_mut();
+ let inner = me.inner.get_mut();
+
+ loop {
+ match inner.state {
+ Busy(_) => panic!("must wait for poll_complete before calling start_seek"),
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ // Factor in any unread data from the buf
+ if !buf.is_empty() {
+ let n = buf.discard_read();
+
+ if let SeekFrom::Current(ref mut offset) = pos {
+ *offset += n;
+ }
+ }
+
+ let std = me.std.clone();
+
+ inner.state = Busy(spawn_blocking(move || {
+ let res = (&*std).seek(pos);
+ (Operation::Seek(res), buf)
+ }));
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ let inner = self.inner.get_mut();
+
+ loop {
+ match inner.state {
+ Idle(_) => return Poll::Ready(Ok(inner.pos)),
+ Busy(ref mut rx) => {
+ let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
+ inner.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => {}
+ Operation::Write(Err(e)) => {
+ assert!(inner.last_write_err.is_none());
+ inner.last_write_err = Some(e.kind());
+ }
+ Operation::Write(_) => {}
+ Operation::Seek(res) => {
+ if let Ok(pos) = res {
+ inner.pos = pos;
+ }
+ return Ready(res);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+impl AsyncWrite for File {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ src: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let me = self.get_mut();
+ let inner = me.inner.get_mut();
+
+ if let Some(e) = inner.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ loop {
+ match inner.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ let seek = if !buf.is_empty() {
+ Some(SeekFrom::Current(buf.discard_read()))
+ } else {
+ None
+ };
+
+ let n = buf.copy_from(src);
+ let std = me.std.clone();
+
+ let blocking_task_join_handle = spawn_mandatory_blocking(move || {
+ let res = if let Some(seek) = seek {
+ (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
+ } else {
+ buf.write_to(&mut &*std)
+ };
+
+ (Operation::Write(res), buf)
+ })
+ .ok_or_else(|| {
+ io::Error::new(io::ErrorKind::Other, "background task failed")
+ })?;
+
+ inner.state = Busy(blocking_task_join_handle);
+
+ return Ready(Ok(n));
+ }
+ Busy(ref mut rx) => {
+ let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
+ inner.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => {
+ // We don't care about the result here. The fact
+ // that the cursor has advanced will be reflected in
+ // the next iteration of the loop
+ continue;
+ }
+ Operation::Write(res) => {
+ // If the previous write was successful, continue.
+ // Otherwise, error.
+ res?;
+ continue;
+ }
+ Operation::Seek(_) => {
+ // Ignore the seek
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ let inner = self.inner.get_mut();
+ inner.poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ self.poll_flush(cx)
+ }
+}
+
+impl From<StdFile> for File {
+ fn from(std: StdFile) -> Self {
+ Self::from_std(std)
+ }
+}
+
+impl fmt::Debug for File {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("tokio::fs::File")
+ .field("std", &self.std)
+ .finish()
+ }
+}
+
+#[cfg(unix)]
+impl std::os::unix::io::AsRawFd for File {
+ fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
+ self.std.as_raw_fd()
+ }
+}
+
+#[cfg(unix)]
+impl std::os::unix::io::FromRawFd for File {
+ unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
+ StdFile::from_raw_fd(fd).into()
+ }
+}
+
+#[cfg(windows)]
+impl std::os::windows::io::AsRawHandle for File {
+ fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
+ self.std.as_raw_handle()
+ }
+}
+
+#[cfg(windows)]
+impl std::os::windows::io::FromRawHandle for File {
+ unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
+ StdFile::from_raw_handle(handle).into()
+ }
+}
+
+impl Inner {
+ async fn complete_inflight(&mut self) {
+ use crate::future::poll_fn;
+
+ if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
+ self.last_write_err = Some(e.kind());
+ }
+ }
+
+ fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ if let Some(e) = self.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ let (op, buf) = match self.state {
+ Idle(_) => return Ready(Ok(())),
+ Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
+ };
+
+ // The buffer is not used here
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => Ready(Ok(())),
+ Operation::Write(res) => Ready(res),
+ Operation::Seek(_) => Ready(Ok(())),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests;