diff options
Diffstat (limited to 'third_party/rust/tokio/src/fs')
28 files changed, 3872 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/fs/canonicalize.rs b/third_party/rust/tokio/src/fs/canonicalize.rs new file mode 100644 index 0000000000..403662685c --- /dev/null +++ b/third_party/rust/tokio/src/fs/canonicalize.rs @@ -0,0 +1,51 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::{Path, PathBuf}; + +/// Returns the canonical, absolute form of a path with all intermediate +/// components normalized and symbolic links resolved. +/// +/// This is an async version of [`std::fs::canonicalize`][std] +/// +/// [std]: std::fs::canonicalize +/// +/// # Platform-specific behavior +/// +/// This function currently corresponds to the `realpath` function on Unix +/// and the `CreateFile` and `GetFinalPathNameByHandle` functions on Windows. +/// Note that, this [may change in the future][changes]. +/// +/// On Windows, this converts the path to use [extended length path][path] +/// syntax, which allows your program to use longer path names, but means you +/// can only join backslash-delimited paths to it, and it may be incompatible +/// with other applications (if passed to the application on the command-line, +/// or written to a file another application may read). +/// +/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior +/// [path]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * `path` does not exist. +/// * A non-final component in path is not a directory. +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// use std::io; +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let path = fs::canonicalize("../a/../foo.txt").await?; +/// Ok(()) +/// } +/// ``` +pub async fn canonicalize(path: impl AsRef<Path>) -> io::Result<PathBuf> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::canonicalize(path)).await +} diff --git a/third_party/rust/tokio/src/fs/copy.rs b/third_party/rust/tokio/src/fs/copy.rs new file mode 100644 index 0000000000..b47f287285 --- /dev/null +++ b/third_party/rust/tokio/src/fs/copy.rs @@ -0,0 +1,27 @@ +use crate::fs::asyncify; +use std::path::Path; + +/// Copies the contents of one file to another. This function will also copy the permission bits +/// of the original file to the destination file. +/// This function will overwrite the contents of to. +/// +/// This is the async equivalent of [`std::fs::copy`][std]. +/// +/// [std]: fn@std::fs::copy +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// +/// # async fn dox() -> std::io::Result<()> { +/// fs::copy("foo.txt", "bar.txt").await?; +/// # Ok(()) +/// # } +/// ``` + +pub async fn copy(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<u64, std::io::Error> { + let from = from.as_ref().to_owned(); + let to = to.as_ref().to_owned(); + asyncify(|| std::fs::copy(from, to)).await +} diff --git a/third_party/rust/tokio/src/fs/create_dir.rs b/third_party/rust/tokio/src/fs/create_dir.rs new file mode 100644 index 0000000000..411969500f --- /dev/null +++ b/third_party/rust/tokio/src/fs/create_dir.rs @@ -0,0 +1,52 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Creates a new, empty directory at the provided path. +/// +/// This is an async version of [`std::fs::create_dir`][std] +/// +/// [std]: std::fs::create_dir +/// +/// # Platform-specific behavior +/// +/// This function currently corresponds to the `mkdir` function on Unix +/// and the `CreateDirectory` function on Windows. +/// Note that, this [may change in the future][changes]. +/// +/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior +/// +/// **NOTE**: If a parent of the given path doesn't exist, this function will +/// return an error. To create a directory and all its missing parents at the +/// same time, use the [`create_dir_all`] function. +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * User lacks permissions to create directory at `path`. +/// * A parent of the given path doesn't exist. (To create a directory and all +/// its missing parents at the same time, use the [`create_dir_all`] +/// function.) +/// * `path` already exists. +/// +/// [`create_dir_all`]: super::create_dir_all() +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// use std::io; +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// fs::create_dir("/some/dir").await?; +/// Ok(()) +/// } +/// ``` +pub async fn create_dir(path: impl AsRef<Path>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::create_dir(path)).await +} diff --git a/third_party/rust/tokio/src/fs/create_dir_all.rs b/third_party/rust/tokio/src/fs/create_dir_all.rs new file mode 100644 index 0000000000..21f0c82d11 --- /dev/null +++ b/third_party/rust/tokio/src/fs/create_dir_all.rs @@ -0,0 +1,53 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Recursively creates a directory and all of its parent components if they +/// are missing. +/// +/// This is an async version of [`std::fs::create_dir_all`][std] +/// +/// [std]: std::fs::create_dir_all +/// +/// # Platform-specific behavior +/// +/// This function currently corresponds to the `mkdir` function on Unix +/// and the `CreateDirectory` function on Windows. +/// Note that, this [may change in the future][changes]. +/// +/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * If any directory in the path specified by `path` does not already exist +/// and it could not be created otherwise. The specific error conditions for +/// when a directory is being created (after it is determined to not exist) are +/// outlined by [`fs::create_dir`]. +/// +/// Notable exception is made for situations where any of the directories +/// specified in the `path` could not be created as it was being created concurrently. +/// Such cases are considered to be successful. That is, calling `create_dir_all` +/// concurrently from multiple threads or processes is guaranteed not to fail +/// due to a race condition with itself. +/// +/// [`fs::create_dir`]: std::fs::create_dir +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// +/// #[tokio::main] +/// async fn main() -> std::io::Result<()> { +/// fs::create_dir_all("/some/dir").await?; +/// Ok(()) +/// } +/// ``` +pub async fn create_dir_all(path: impl AsRef<Path>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::create_dir_all(path)).await +} diff --git a/third_party/rust/tokio/src/fs/dir_builder.rs b/third_party/rust/tokio/src/fs/dir_builder.rs new file mode 100644 index 0000000000..97168bff70 --- /dev/null +++ b/third_party/rust/tokio/src/fs/dir_builder.rs @@ -0,0 +1,137 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// A builder for creating directories in various manners. +/// +/// This is a specialized version of [`std::fs::DirBuilder`] for usage on +/// the Tokio runtime. +/// +/// [std::fs::DirBuilder]: std::fs::DirBuilder +#[derive(Debug, Default)] +pub struct DirBuilder { + /// Indicates whether to create parent directories if they are missing. + recursive: bool, + + /// Sets the Unix mode for newly created directories. + #[cfg(unix)] + pub(super) mode: Option<u32>, +} + +impl DirBuilder { + /// Creates a new set of options with default mode/security settings for all + /// platforms and also non-recursive. + /// + /// This is an async version of [`std::fs::DirBuilder::new`][std] + /// + /// [std]: std::fs::DirBuilder::new + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::DirBuilder; + /// + /// let builder = DirBuilder::new(); + /// ``` + pub fn new() -> Self { + Default::default() + } + + /// Indicates whether to create directories recursively (including all parent directories). + /// Parents that do not exist are created with the same security and permissions settings. + /// + /// This option defaults to `false`. + /// + /// This is an async version of [`std::fs::DirBuilder::recursive`][std] + /// + /// [std]: std::fs::DirBuilder::recursive + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::DirBuilder; + /// + /// let mut builder = DirBuilder::new(); + /// builder.recursive(true); + /// ``` + pub fn recursive(&mut self, recursive: bool) -> &mut Self { + self.recursive = recursive; + self + } + + /// Creates the specified directory with the configured options. + /// + /// It is considered an error if the directory already exists unless + /// recursive mode is enabled. + /// + /// This is an async version of [`std::fs::DirBuilder::create`][std] + /// + /// [std]: std::fs::DirBuilder::create + /// + /// # Errors + /// + /// An error will be returned under the following circumstances: + /// + /// * Path already points to an existing file. + /// * Path already points to an existing directory and the mode is + /// non-recursive. + /// * The calling process doesn't have permissions to create the directory + /// or its missing parents. + /// * Other I/O error occurred. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::DirBuilder; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// DirBuilder::new() + /// .recursive(true) + /// .create("/tmp/foo/bar/baz") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub async fn create(&self, path: impl AsRef<Path>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + let mut builder = std::fs::DirBuilder::new(); + builder.recursive(self.recursive); + + #[cfg(unix)] + { + if let Some(mode) = self.mode { + std::os::unix::fs::DirBuilderExt::mode(&mut builder, mode); + } + } + + asyncify(move || builder.create(path)).await + } +} + +feature! { + #![unix] + + impl DirBuilder { + /// Sets the mode to create new directories with. + /// + /// This option defaults to 0o777. + /// + /// # Examples + /// + /// + /// ```no_run + /// use tokio::fs::DirBuilder; + /// + /// let mut builder = DirBuilder::new(); + /// builder.mode(0o775); + /// ``` + pub fn mode(&mut self, mode: u32) -> &mut Self { + self.mode = Some(mode); + self + } + } +} diff --git a/third_party/rust/tokio/src/fs/file.rs b/third_party/rust/tokio/src/fs/file.rs new file mode 100644 index 0000000000..74f91958d0 --- /dev/null +++ b/third_party/rust/tokio/src/fs/file.rs @@ -0,0 +1,820 @@ +//! Types for working with [`File`]. +//! +//! [`File`]: File + +use self::State::*; +use crate::fs::asyncify; +use crate::io::blocking::Buf; +use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; +use crate::sync::Mutex; + +use std::fmt; +use std::fs::{Metadata, Permissions}; +use std::future::Future; +use std::io::{self, Seek, SeekFrom}; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::task::Poll::*; + +#[cfg(test)] +use super::mocks::JoinHandle; +#[cfg(test)] +use super::mocks::MockFile as StdFile; +#[cfg(test)] +use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; +#[cfg(not(test))] +use crate::blocking::JoinHandle; +#[cfg(not(test))] +use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; +#[cfg(not(test))] +use std::fs::File as StdFile; + +/// A reference to an open file on the filesystem. +/// +/// This is a specialized version of [`std::fs::File`][std] for usage from the +/// Tokio runtime. +/// +/// An instance of a `File` can be read and/or written depending on what options +/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical +/// cursor that the file contains internally. +/// +/// A file will not be closed immediately when it goes out of scope if there +/// are any IO operations that have not yet completed. To ensure that a file is +/// closed immediately when it is dropped, you should call [`flush`] before +/// dropping it. Note that this does not ensure that the file has been fully +/// written to disk; the operating system might keep the changes around in an +/// in-memory buffer. See the [`sync_all`] method for telling the OS to write +/// the data to disk. +/// +/// Reading and writing to a `File` is usually done using the convenience +/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. +/// +/// [std]: struct@std::fs::File +/// [`AsyncSeek`]: trait@crate::io::AsyncSeek +/// [`flush`]: fn@crate::io::AsyncWriteExt::flush +/// [`sync_all`]: fn@crate::fs::File::sync_all +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// +/// # Examples +/// +/// Create a new file and asynchronously write bytes to it: +/// +/// ```no_run +/// use tokio::fs::File; +/// use tokio::io::AsyncWriteExt; // for write_all() +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut file = File::create("foo.txt").await?; +/// file.write_all(b"hello, world!").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Read the contents of a file into a buffer: +/// +/// ```no_run +/// use tokio::fs::File; +/// use tokio::io::AsyncReadExt; // for read_to_end() +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut file = File::open("foo.txt").await?; +/// +/// let mut contents = vec![]; +/// file.read_to_end(&mut contents).await?; +/// +/// println!("len = {}", contents.len()); +/// # Ok(()) +/// # } +/// ``` +pub struct File { + std: Arc<StdFile>, + inner: Mutex<Inner>, +} + +struct Inner { + state: State, + + /// Errors from writes/flushes are returned in write/flush calls. If a write + /// error is observed while performing a read, it is saved until the next + /// write / flush call. + last_write_err: Option<io::ErrorKind>, + + pos: u64, +} + +#[derive(Debug)] +enum State { + Idle(Option<Buf>), + Busy(JoinHandle<(Operation, Buf)>), +} + +#[derive(Debug)] +enum Operation { + Read(io::Result<usize>), + Write(io::Result<()>), + Seek(io::Result<u64>), +} + +impl File { + /// Attempts to open a file in read-only mode. + /// + /// See [`OpenOptions`] for more details. + /// + /// [`OpenOptions`]: super::OpenOptions + /// + /// # Errors + /// + /// This function will return an error if called from outside of the Tokio + /// runtime or if path does not already exist. Other errors may also be + /// returned according to OpenOptions::open. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncReadExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// + /// let mut contents = vec![]; + /// file.read_to_end(&mut contents).await?; + /// + /// println!("len = {}", contents.len()); + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait. + /// + /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end + /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt + pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { + let path = path.as_ref().to_owned(); + let std = asyncify(|| StdFile::open(path)).await?; + + Ok(File::from_std(std)) + } + + /// Opens a file in write-only mode. + /// + /// This function will create a file if it does not exist, and will truncate + /// it if it does. + /// + /// See [`OpenOptions`] for more details. + /// + /// [`OpenOptions`]: super::OpenOptions + /// + /// # Errors + /// + /// Results in an error if called from outside of the Tokio runtime or if + /// the underlying [`create`] call results in an error. + /// + /// [`create`]: std::fs::File::create + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn create(path: impl AsRef<Path>) -> io::Result<File> { + let path = path.as_ref().to_owned(); + let std_file = asyncify(move || StdFile::create(path)).await?; + Ok(File::from_std(std_file)) + } + + /// Converts a [`std::fs::File`][std] to a [`tokio::fs::File`][file]. + /// + /// [std]: std::fs::File + /// [file]: File + /// + /// # Examples + /// + /// ```no_run + /// // This line could block. It is not recommended to do this on the Tokio + /// // runtime. + /// let std_file = std::fs::File::open("foo.txt").unwrap(); + /// let file = tokio::fs::File::from_std(std_file); + /// ``` + pub fn from_std(std: StdFile) -> File { + File { + std: Arc::new(std), + inner: Mutex::new(Inner { + state: State::Idle(Some(Buf::with_capacity(0))), + last_write_err: None, + pos: 0, + }), + } + } + + /// Attempts to sync all OS-internal metadata to disk. + /// + /// This function will attempt to ensure that all in-core data reaches the + /// filesystem before returning. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.sync_all().await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn sync_all(&self) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; + + let std = self.std.clone(); + asyncify(move || std.sync_all()).await + } + + /// This function is similar to `sync_all`, except that it may not + /// synchronize file metadata to the filesystem. + /// + /// This is intended for use cases that must synchronize content, but don't + /// need the metadata on disk. The goal of this method is to reduce disk + /// operations. + /// + /// Note that some platforms may simply implement this in terms of `sync_all`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.sync_data().await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn sync_data(&self) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; + + let std = self.std.clone(); + asyncify(move || std.sync_data()).await + } + + /// Truncates or extends the underlying file, updating the size of this file to become size. + /// + /// If the size is less than the current file's size, then the file will be + /// shrunk. If it is greater than the current file's size, then the file + /// will be extended to size and have all of the intermediate data filled in + /// with 0s. + /// + /// # Errors + /// + /// This function will return an error if the file is not opened for + /// writing. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.set_len(10).await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn set_len(&self, size: u64) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; + + let mut buf = match inner.state { + Idle(ref mut buf_cell) => buf_cell.take().unwrap(), + _ => unreachable!(), + }; + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let std = self.std.clone(); + + inner.state = Busy(spawn_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| std.set_len(size)) + } else { + std.set_len(size) + } + .map(|_| 0); // the value is discarded later + + // Return the result as a seek + (Operation::Seek(res), buf) + })); + + let (op, buf) = match inner.state { + Idle(_) => unreachable!(), + Busy(ref mut rx) => rx.await?, + }; + + inner.state = Idle(Some(buf)); + + match op { + Operation::Seek(res) => res.map(|pos| { + inner.pos = pos; + }), + _ => unreachable!(), + } + } + + /// Queries metadata about the underlying file. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let metadata = file.metadata().await?; + /// + /// println!("{:?}", metadata); + /// # Ok(()) + /// # } + /// ``` + pub async fn metadata(&self) -> io::Result<Metadata> { + let std = self.std.clone(); + asyncify(move || std.metadata()).await + } + + /// Creates a new `File` instance that shares the same underlying file handle + /// as the existing `File` instance. Reads, writes, and seeks will affect both + /// File instances simultaneously. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let file_clone = file.try_clone().await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn try_clone(&self) -> io::Result<File> { + self.inner.lock().await.complete_inflight().await; + let std = self.std.clone(); + let std_file = asyncify(move || std.try_clone()).await?; + Ok(File::from_std(std_file)) + } + + /// Destructures `File` into a [`std::fs::File`][std]. This function is + /// async to allow any in-flight operations to complete. + /// + /// Use `File::try_into_std` to attempt conversion immediately. + /// + /// [std]: std::fs::File + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let tokio_file = File::open("foo.txt").await?; + /// let std_file = tokio_file.into_std().await; + /// # Ok(()) + /// # } + /// ``` + pub async fn into_std(mut self) -> StdFile { + self.inner.get_mut().complete_inflight().await; + Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed") + } + + /// Tries to immediately destructure `File` into a [`std::fs::File`][std]. + /// + /// [std]: std::fs::File + /// + /// # Errors + /// + /// This function will return an error containing the file if some + /// operation is in-flight. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let tokio_file = File::open("foo.txt").await?; + /// let std_file = tokio_file.try_into_std().unwrap(); + /// # Ok(()) + /// # } + /// ``` + pub fn try_into_std(mut self) -> Result<StdFile, Self> { + match Arc::try_unwrap(self.std) { + Ok(file) => Ok(file), + Err(std_file_arc) => { + self.std = std_file_arc; + Err(self) + } + } + } + + /// Changes the permissions on the underlying file. + /// + /// # Platform-specific behavior + /// + /// This function currently corresponds to the `fchmod` function on Unix and + /// the `SetFileInformationByHandle` function on Windows. Note that, this + /// [may change in the future][changes]. + /// + /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior + /// + /// # Errors + /// + /// This function will return an error if the user lacks permission change + /// attributes on the underlying file. It may also return an error in other + /// os-specific unspecified cases. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let mut perms = file.metadata().await?.permissions(); + /// perms.set_readonly(true); + /// file.set_permissions(perms).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { + let std = self.std.clone(); + asyncify(move || std.set_permissions(perm)).await + } +} + +impl AsyncRead for File { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + ready!(crate::trace::trace_leaf(cx)); + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + if !buf.is_empty() { + buf.copy_to(dst); + *buf_cell = Some(buf); + return Ready(Ok(())); + } + + buf.ensure_capacity_for(dst); + let std = me.std.clone(); + + inner.state = Busy(spawn_blocking(move || { + let res = buf.read_from(&mut &*std); + (Operation::Read(res), buf) + })); + } + Busy(ref mut rx) => { + let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match op { + Operation::Read(Ok(_)) => { + buf.copy_to(dst); + inner.state = Idle(Some(buf)); + return Ready(Ok(())); + } + Operation::Read(Err(e)) => { + assert!(buf.is_empty()); + + inner.state = Idle(Some(buf)); + return Ready(Err(e)); + } + Operation::Write(Ok(_)) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + continue; + } + Operation::Write(Err(e)) => { + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + inner.state = Idle(Some(buf)); + } + Operation::Seek(result) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + if let Ok(pos) = result { + inner.pos = pos; + } + continue; + } + } + } + } + } + } +} + +impl AsyncSeek for File { + fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + match inner.state { + Busy(_) => Err(io::Error::new( + io::ErrorKind::Other, + "other file operation is pending, call poll_complete before start_seek", + )), + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + // Factor in any unread data from the buf + if !buf.is_empty() { + let n = buf.discard_read(); + + if let SeekFrom::Current(ref mut offset) = pos { + *offset += n; + } + } + + let std = me.std.clone(); + + inner.state = Busy(spawn_blocking(move || { + let res = (&*std).seek(pos); + (Operation::Seek(res), buf) + })); + Ok(()) + } + } + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + ready!(crate::trace::trace_leaf(cx)); + let inner = self.inner.get_mut(); + + loop { + match inner.state { + Idle(_) => return Poll::Ready(Ok(inner.pos)), + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => {} + Operation::Write(Err(e)) => { + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + } + Operation::Write(_) => {} + Operation::Seek(res) => { + if let Ok(pos) = res { + inner.pos = pos; + } + return Ready(res); + } + } + } + } + } + } +} + +impl AsyncWrite for File { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + src: &[u8], + ) -> Poll<io::Result<usize>> { + ready!(crate::trace::trace_leaf(cx)); + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + if let Some(e) = inner.last_write_err.take() { + return Ready(Err(e.into())); + } + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let n = buf.copy_from(src); + let std = me.std.clone(); + + let blocking_task_join_handle = spawn_mandatory_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + + (Operation::Write(res), buf) + }) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "background task failed") + })?; + + inner.state = Busy(blocking_task_join_handle); + + return Ready(Ok(n)); + } + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => { + // We don't care about the result here. The fact + // that the cursor has advanced will be reflected in + // the next iteration of the loop + continue; + } + Operation::Write(res) => { + // If the previous write was successful, continue. + // Otherwise, error. + res?; + continue; + } + Operation::Seek(_) => { + // Ignore the seek + continue; + } + } + } + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + ready!(crate::trace::trace_leaf(cx)); + let inner = self.inner.get_mut(); + inner.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + ready!(crate::trace::trace_leaf(cx)); + self.poll_flush(cx) + } +} + +impl From<StdFile> for File { + fn from(std: StdFile) -> Self { + Self::from_std(std) + } +} + +impl fmt::Debug for File { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("tokio::fs::File") + .field("std", &self.std) + .finish() + } +} + +#[cfg(unix)] +impl std::os::unix::io::AsRawFd for File { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.std.as_raw_fd() + } +} + +#[cfg(all(unix, not(tokio_no_as_fd)))] +impl std::os::unix::io::AsFd for File { + fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> { + unsafe { + std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self)) + } + } +} + +#[cfg(unix)] +impl std::os::unix::io::FromRawFd for File { + unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { + StdFile::from_raw_fd(fd).into() + } +} + +cfg_windows! { + use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; + #[cfg(not(tokio_no_as_fd))] + use crate::os::windows::io::{AsHandle, BorrowedHandle}; + + impl AsRawHandle for File { + fn as_raw_handle(&self) -> RawHandle { + self.std.as_raw_handle() + } + } + + #[cfg(not(tokio_no_as_fd))] + impl AsHandle for File { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { + BorrowedHandle::borrow_raw( + AsRawHandle::as_raw_handle(self), + ) + } + } + } + + impl FromRawHandle for File { + unsafe fn from_raw_handle(handle: RawHandle) -> Self { + StdFile::from_raw_handle(handle).into() + } + } +} + +impl Inner { + async fn complete_inflight(&mut self) { + use crate::future::poll_fn; + + poll_fn(|cx| self.poll_complete_inflight(cx)).await + } + + fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> { + ready!(crate::trace::trace_leaf(cx)); + match self.poll_flush(cx) { + Poll::Ready(Err(e)) => { + self.last_write_err = Some(e.kind()); + Poll::Ready(()) + } + Poll::Ready(Ok(())) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + if let Some(e) = self.last_write_err.take() { + return Ready(Err(e.into())); + } + + let (op, buf) = match self.state { + Idle(_) => return Ready(Ok(())), + Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, + }; + + // The buffer is not used here + self.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => Ready(Ok(())), + Operation::Write(res) => Ready(res), + Operation::Seek(_) => Ready(Ok(())), + } + } +} + +#[cfg(test)] +mod tests; diff --git a/third_party/rust/tokio/src/fs/file/tests.rs b/third_party/rust/tokio/src/fs/file/tests.rs new file mode 100644 index 0000000000..7c61b3c4b3 --- /dev/null +++ b/third_party/rust/tokio/src/fs/file/tests.rs @@ -0,0 +1,978 @@ +use super::*; +use crate::{ + fs::mocks::*, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, +}; +use mockall::{predicate::eq, Sequence}; +use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; + +const HELLO: &[u8] = b"hello world..."; +const FOO: &[u8] = b"foo bar baz..."; + +#[test] +fn open_read() { + let mut file = MockFile::default(); + file.expect_inner_read().once().returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + let mut file = File::from_std(file); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_eq!(0, pool::len()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + + pool::run_one(); + + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); +} + +#[test] +fn read_twice_before_dispatch() { + let mut file = MockFile::default(); + file.expect_inner_read().once().returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + let mut file = File::from_std(file); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + assert_pending!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert_eq!(&buf[..n], HELLO); +} + +#[test] +fn read_with_smaller_buf() { + let mut file = MockFile::default(); + file.expect_inner_read().once().returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + + let mut file = File::from_std(file); + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + } + + pool::run_one(); + + { + let mut buf = [0; 4]; + let mut t = task::spawn(file.read(&mut buf)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 4); + assert_eq!(&buf[..], &HELLO[..n]); + } + + // Calling again immediately succeeds with the rest of the buffer + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 10); + assert_eq!(&buf[..n], &HELLO[4..]); + + assert_eq!(0, pool::len()); +} + +#[test] +fn read_with_bigger_buf() { + let mut seq = Sequence::new(); + let mut file = MockFile::default(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..4].copy_from_slice(&HELLO[..4]); + Ok(4) + }); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len() - 4].copy_from_slice(&HELLO[4..]); + Ok(HELLO.len() - 4) + }); + + let mut file = File::from_std(file); + + { + let mut buf = [0; 4]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + } + + pool::run_one(); + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 4); + assert_eq!(&buf[..n], &HELLO[..n]); + } + + // Calling again immediately succeeds with the rest of the buffer + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 10); + assert_eq!(&buf[..n], &HELLO[4..]); + + assert_eq!(0, pool::len()); +} + +#[test] +fn read_err_then_read_success() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|_| Err(io::ErrorKind::Other.into())); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + + let mut file = File::from_std(file); + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + assert_ready_err!(t.poll()); + } + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let n = assert_ready_ok!(t.poll()); + + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); + } +} + +#[test] +fn open_write() { + let mut file = MockFile::default(); + file.expect_inner_write() + .once() + .with(eq(HELLO)) + .returning(|buf| Ok(buf.len())); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + + assert_eq!(0, pool::len()); + assert_ready_ok!(t.poll()); + + assert_eq!(1, pool::len()); + + pool::run_one(); + + assert!(!t.is_woken()); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn flush_while_idle() { + let file = MockFile::default(); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +#[cfg_attr(miri, ignore)] // takes a really long time with miri +fn read_with_buffer_larger_than_max() { + // Chunks + let chunk_a = crate::io::blocking::MAX_BUF; + let chunk_b = chunk_a * 2; + let chunk_c = chunk_a * 3; + let chunk_d = chunk_a * 4; + + assert_eq!(chunk_d / 1024 / 1024, 8); + + let mut data = vec![]; + for i in 0..(chunk_d - 1) { + data.push((i % 151) as u8); + } + let data = Arc::new(data); + let d0 = data.clone(); + let d1 = data.clone(); + let d2 = data.clone(); + let d3 = data.clone(); + + let mut seq = Sequence::new(); + let mut file = MockFile::default(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(move |buf| { + buf[0..chunk_a].copy_from_slice(&d0[0..chunk_a]); + Ok(chunk_a) + }); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(move |buf| { + buf[..chunk_a].copy_from_slice(&d1[chunk_a..chunk_b]); + Ok(chunk_b - chunk_a) + }); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(move |buf| { + buf[..chunk_a].copy_from_slice(&d2[chunk_b..chunk_c]); + Ok(chunk_c - chunk_b) + }); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(move |buf| { + buf[..chunk_a - 1].copy_from_slice(&d3[chunk_c..]); + Ok(chunk_a - 1) + }); + let mut file = File::from_std(file); + + let mut actual = vec![0; chunk_d]; + let mut pos = 0; + + while pos < data.len() { + let mut t = task::spawn(file.read(&mut actual[pos..])); + + assert_pending!(t.poll()); + pool::run_one(); + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert!(n <= chunk_a); + + pos += n; + } + + assert_eq!(&data[..], &actual[..data.len()]); +} + +#[test] +#[cfg_attr(miri, ignore)] // takes a really long time with miri +fn write_with_buffer_larger_than_max() { + // Chunks + let chunk_a = crate::io::blocking::MAX_BUF; + let chunk_b = chunk_a * 2; + let chunk_c = chunk_a * 3; + let chunk_d = chunk_a * 4; + + assert_eq!(chunk_d / 1024 / 1024, 8); + + let mut data = vec![]; + for i in 0..(chunk_d - 1) { + data.push((i % 151) as u8); + } + let data = Arc::new(data); + let d0 = data.clone(); + let d1 = data.clone(); + let d2 = data.clone(); + let d3 = data.clone(); + + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .withf(move |buf| buf == &d0[0..chunk_a]) + .returning(|buf| Ok(buf.len())); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .withf(move |buf| buf == &d1[chunk_a..chunk_b]) + .returning(|buf| Ok(buf.len())); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .withf(move |buf| buf == &d2[chunk_b..chunk_c]) + .returning(|buf| Ok(buf.len())); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .withf(move |buf| buf == &d3[chunk_c..chunk_d - 1]) + .returning(|buf| Ok(buf.len())); + + let mut file = File::from_std(file); + + let mut rem = &data[..]; + + let mut first = true; + + while !rem.is_empty() { + let mut task = task::spawn(file.write(rem)); + + if !first { + assert_pending!(task.poll()); + pool::run_one(); + assert!(task.is_woken()); + } + + first = false; + + let n = assert_ready_ok!(task.poll()); + + rem = &rem[n..]; + } + + pool::run_one(); +} + +#[test] +fn write_twice_before_dispatch() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(HELLO)) + .returning(|buf| Ok(buf.len())); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(FOO)) + .returning(|buf| Ok(buf.len())); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.write(FOO)); + assert_pending!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + assert!(t.is_woken()); + + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.flush()); + assert_pending!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn incomplete_read_followed_by_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + file.expect_inner_seek() + .once() + .with(eq(SeekFrom::Current(-(HELLO.len() as i64)))) + .in_sequence(&mut seq) + .returning(|_| Ok(0)); + file.expect_inner_write() + .once() + .with(eq(FOO)) + .returning(|_| Ok(FOO.len())); + + let mut file = File::from_std(file); + + let mut buf = [0; 32]; + + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn incomplete_partial_read_followed_by_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + file.expect_inner_seek() + .once() + .in_sequence(&mut seq) + .with(eq(SeekFrom::Current(-10))) + .returning(|_| Ok(0)); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(FOO)) + .returning(|_| Ok(FOO.len())); + + let mut file = File::from_std(file); + + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let mut buf = [0; 4]; + let mut t = task::spawn(file.read(&mut buf)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn incomplete_read_followed_by_flush() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + file.expect_inner_seek() + .once() + .in_sequence(&mut seq) + .with(eq(SeekFrom::Current(-(HELLO.len() as i64)))) + .returning(|_| Ok(0)); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(FOO)) + .returning(|_| Ok(FOO.len())); + + let mut file = File::from_std(file); + + let mut buf = [0; 32]; + + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); +} + +#[test] +fn incomplete_flush_followed_by_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(HELLO)) + .returning(|_| Ok(HELLO.len())); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(FOO)) + .returning(|_| Ok(FOO.len())); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, HELLO.len()); + + let mut t = task::spawn(file.flush()); + assert_pending!(t.poll()); + + // TODO: Move under write + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn read_err() { + let mut file = MockFile::default(); + file.expect_inner_read() + .once() + .returning(|_| Err(io::ErrorKind::Other.into())); + + let mut file = File::from_std(file); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + pool::run_one(); + assert!(t.is_woken()); + + assert_ready_err!(t.poll()); +} + +#[test] +fn write_write_err() { + let mut file = MockFile::default(); + file.expect_inner_write() + .once() + .returning(|_| Err(io::ErrorKind::Other.into())); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_read_write_err() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .returning(|_| Err(io::ErrorKind::Other.into())); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_read_flush_err() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .returning(|_| Err(io::ErrorKind::Other.into())); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_seek_write_err() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .returning(|_| Err(io::ErrorKind::Other.into())); + file.expect_inner_seek() + .once() + .with(eq(SeekFrom::Start(0))) + .in_sequence(&mut seq) + .returning(|_| Ok(0)); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + { + let mut t = task::spawn(file.seek(SeekFrom::Start(0))); + assert_pending!(t.poll()); + } + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_seek_flush_err() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .returning(|_| Err(io::ErrorKind::Other.into())); + file.expect_inner_seek() + .once() + .with(eq(SeekFrom::Start(0))) + .in_sequence(&mut seq) + .returning(|_| Ok(0)); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + { + let mut t = task::spawn(file.seek(SeekFrom::Start(0))); + assert_pending!(t.poll()); + } + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_err!(t.poll()); +} + +#[test] +fn sync_all_ordered_after_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(HELLO)) + .returning(|_| Ok(HELLO.len())); + file.expect_sync_all().once().returning(|| Ok(())); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_all()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn sync_all_err_ordered_after_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(HELLO)) + .returning(|_| Ok(HELLO.len())); + file.expect_sync_all() + .once() + .returning(|| Err(io::ErrorKind::Other.into())); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_all()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_err!(t.poll()); +} + +#[test] +fn sync_data_ordered_after_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(HELLO)) + .returning(|_| Ok(HELLO.len())); + file.expect_sync_data().once().returning(|| Ok(())); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_data()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn sync_data_err_ordered_after_write() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .with(eq(HELLO)) + .returning(|_| Ok(HELLO.len())); + file.expect_sync_data() + .once() + .returning(|| Err(io::ErrorKind::Other.into())); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_data()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_err!(t.poll()); +} + +#[test] +fn open_set_len_ok() { + let mut file = MockFile::default(); + file.expect_set_len().with(eq(123)).returning(|_| Ok(())); + + let file = File::from_std(file); + let mut t = task::spawn(file.set_len(123)); + + assert_pending!(t.poll()); + + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn open_set_len_err() { + let mut file = MockFile::default(); + file.expect_set_len() + .with(eq(123)) + .returning(|_| Err(io::ErrorKind::Other.into())); + + let file = File::from_std(file); + let mut t = task::spawn(file.set_len(123)); + + assert_pending!(t.poll()); + + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_err!(t.poll()); +} + +#[test] +fn partial_read_set_len_ok() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..HELLO.len()].copy_from_slice(HELLO); + Ok(HELLO.len()) + }); + file.expect_inner_seek() + .once() + .with(eq(SeekFrom::Current(-(HELLO.len() as i64)))) + .in_sequence(&mut seq) + .returning(|_| Ok(0)); + file.expect_set_len() + .once() + .in_sequence(&mut seq) + .with(eq(123)) + .returning(|_| Ok(())); + file.expect_inner_read() + .once() + .in_sequence(&mut seq) + .returning(|buf| { + buf[0..FOO.len()].copy_from_slice(FOO); + Ok(FOO.len()) + }); + + let mut buf = [0; 32]; + let mut file = File::from_std(file); + + { + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + } + + pool::run_one(); + + { + let mut t = task::spawn(file.set_len(123)); + + assert_pending!(t.poll()); + pool::run_one(); + assert_ready_ok!(t.poll()); + } + + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + pool::run_one(); + let n = assert_ready_ok!(t.poll()); + + assert_eq!(n, FOO.len()); + assert_eq!(&buf[..n], FOO); +} + +#[test] +fn busy_file_seek_error() { + let mut file = MockFile::default(); + let mut seq = Sequence::new(); + file.expect_inner_write() + .once() + .in_sequence(&mut seq) + .returning(|_| Err(io::ErrorKind::Other.into())); + + let mut file = crate::io::BufReader::new(File::from_std(file)); + { + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + } + + pool::run_one(); + + let mut t = task::spawn(file.seek(SeekFrom::Start(0))); + assert_ready_err!(t.poll()); +} diff --git a/third_party/rust/tokio/src/fs/hard_link.rs b/third_party/rust/tokio/src/fs/hard_link.rs new file mode 100644 index 0000000000..50cc17d286 --- /dev/null +++ b/third_party/rust/tokio/src/fs/hard_link.rs @@ -0,0 +1,46 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Creates a new hard link on the filesystem. +/// +/// This is an async version of [`std::fs::hard_link`][std] +/// +/// [std]: std::fs::hard_link +/// +/// The `dst` path will be a link pointing to the `src` path. Note that systems +/// often require these two paths to both be located on the same filesystem. +/// +/// # Platform-specific behavior +/// +/// This function currently corresponds to the `link` function on Unix +/// and the `CreateHardLink` function on Windows. +/// Note that, this [may change in the future][changes]. +/// +/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * The `src` path is not a file or doesn't exist. +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// +/// #[tokio::main] +/// async fn main() -> std::io::Result<()> { +/// fs::hard_link("a.txt", "b.txt").await?; // Hard link a.txt to b.txt +/// Ok(()) +/// } +/// ``` +pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> { + let src = src.as_ref().to_owned(); + let dst = dst.as_ref().to_owned(); + + asyncify(move || std::fs::hard_link(src, dst)).await +} diff --git a/third_party/rust/tokio/src/fs/metadata.rs b/third_party/rust/tokio/src/fs/metadata.rs new file mode 100644 index 0000000000..ff9cded79a --- /dev/null +++ b/third_party/rust/tokio/src/fs/metadata.rs @@ -0,0 +1,47 @@ +use crate::fs::asyncify; + +use std::fs::Metadata; +use std::io; +use std::path::Path; + +/// Given a path, queries the file system to get information about a file, +/// directory, etc. +/// +/// This is an async version of [`std::fs::metadata`][std] +/// +/// This function will traverse symbolic links to query information about the +/// destination file. +/// +/// # Platform-specific behavior +/// +/// This function currently corresponds to the `stat` function on Unix and the +/// `GetFileAttributesEx` function on Windows. Note that, this [may change in +/// the future][changes]. +/// +/// [std]: std::fs::metadata +/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior +/// +/// # Errors +/// +/// This function will return an error in the following situations, but is not +/// limited to just these cases: +/// +/// * The user lacks permissions to perform `metadata` call on `path`. +/// * `path` does not exist. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::fs; +/// +/// #[tokio::main] +/// async fn main() -> std::io::Result<()> { +/// let attr = fs::metadata("/some/file/path.txt").await?; +/// // inspect attr ... +/// Ok(()) +/// } +/// ``` +pub async fn metadata(path: impl AsRef<Path>) -> io::Result<Metadata> { + let path = path.as_ref().to_owned(); + asyncify(|| std::fs::metadata(path)).await +} diff --git a/third_party/rust/tokio/src/fs/mocks.rs b/third_party/rust/tokio/src/fs/mocks.rs new file mode 100644 index 0000000000..aa01e24711 --- /dev/null +++ b/third_party/rust/tokio/src/fs/mocks.rs @@ -0,0 +1,151 @@ +//! Mock version of std::fs::File; +use mockall::mock; + +use crate::sync::oneshot; +use std::{ + cell::RefCell, + collections::VecDeque, + fs::{Metadata, Permissions}, + future::Future, + io::{self, Read, Seek, SeekFrom, Write}, + path::PathBuf, + pin::Pin, + task::{Context, Poll}, +}; + +mock! { + #[derive(Debug)] + pub File { + pub fn create(pb: PathBuf) -> io::Result<Self>; + // These inner_ methods exist because std::fs::File has two + // implementations for each of these methods: one on "&mut self" and + // one on "&&self". Defining both of those in terms of an inner_ method + // allows us to specify the expectation the same way, regardless of + // which method is used. + pub fn inner_flush(&self) -> io::Result<()>; + pub fn inner_read(&self, dst: &mut [u8]) -> io::Result<usize>; + pub fn inner_seek(&self, pos: SeekFrom) -> io::Result<u64>; + pub fn inner_write(&self, src: &[u8]) -> io::Result<usize>; + pub fn metadata(&self) -> io::Result<Metadata>; + pub fn open(pb: PathBuf) -> io::Result<Self>; + pub fn set_len(&self, size: u64) -> io::Result<()>; + pub fn set_permissions(&self, _perm: Permissions) -> io::Result<()>; + pub fn sync_all(&self) -> io::Result<()>; + pub fn sync_data(&self) -> io::Result<()>; + pub fn try_clone(&self) -> io::Result<Self>; + } + #[cfg(windows)] + impl std::os::windows::io::AsRawHandle for File { + fn as_raw_handle(&self) -> std::os::windows::io::RawHandle; + } + #[cfg(windows)] + impl std::os::windows::io::FromRawHandle for File { + unsafe fn from_raw_handle(h: std::os::windows::io::RawHandle) -> Self; + } + #[cfg(unix)] + impl std::os::unix::io::AsRawFd for File { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd; + } + + #[cfg(unix)] + impl std::os::unix::io::FromRawFd for File { + unsafe fn from_raw_fd(h: std::os::unix::io::RawFd) -> Self; + } +} + +impl Read for MockFile { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.inner_read(dst) + } +} + +impl Read for &'_ MockFile { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.inner_read(dst) + } +} + +impl Seek for &'_ MockFile { + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + self.inner_seek(pos) + } +} + +impl Write for &'_ MockFile { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.inner_write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner_flush() + } +} + +tokio_thread_local! { + static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new()) +} + +#[derive(Debug)] +pub(super) struct JoinHandle<T> { + rx: oneshot::Receiver<T>, +} + +pub(super) fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let task = Box::new(move || { + let _ = tx.send(f()); + }); + + QUEUE.with(|cell| cell.borrow_mut().push_back(task)); + + JoinHandle { rx } +} + +pub(super) fn spawn_mandatory_blocking<F, R>(f: F) -> Option<JoinHandle<R>> +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let task = Box::new(move || { + let _ = tx.send(f()); + }); + + QUEUE.with(|cell| cell.borrow_mut().push_back(task)); + + Some(JoinHandle { rx }) +} + +impl<T> Future for JoinHandle<T> { + type Output = Result<T, io::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + use std::task::Poll::*; + + match Pin::new(&mut self.rx).poll(cx) { + Ready(Ok(v)) => Ready(Ok(v)), + Ready(Err(e)) => panic!("error = {:?}", e), + Pending => Pending, + } + } +} + +pub(super) mod pool { + use super::*; + + pub(in super::super) fn len() -> usize { + QUEUE.with(|cell| cell.borrow().len()) + } + + pub(in super::super) fn run_one() { + let task = QUEUE + .with(|cell| cell.borrow_mut().pop_front()) + .expect("expected task to run, but none ready"); + + task(); + } +} diff --git a/third_party/rust/tokio/src/fs/mod.rs b/third_party/rust/tokio/src/fs/mod.rs new file mode 100644 index 0000000000..f6d9605fe9 --- /dev/null +++ b/third_party/rust/tokio/src/fs/mod.rs @@ -0,0 +1,145 @@ +#![cfg(not(loom))] + +//! Asynchronous file and standard stream adaptation. +//! +//! This module contains utility methods and adapter types for input/output to +//! files or standard streams (`Stdin`, `Stdout`, `Stderr`), and +//! filesystem manipulation, for use within (and only within) a Tokio runtime. +//! +//! Tasks run by *worker* threads should not block, as this could delay +//! servicing reactor events. Portable filesystem operations are blocking, +//! however. This module offers adapters which use a `blocking` annotation +//! to inform the runtime that a blocking operation is required. When +//! necessary, this allows the runtime to convert the current thread from a +//! *worker* to a *backup* thread, where blocking is acceptable. +//! +//! ## Usage +//! +//! Where possible, users should prefer the provided asynchronous-specific +//! traits such as [`AsyncRead`], or methods returning a `Future` or `Poll` +//! type. Adaptions also extend to traits like `std::io::Read` where methods +//! return `std::io::Result`. Be warned that these adapted methods may return +//! `std::io::ErrorKind::WouldBlock` if a *worker* thread can not be converted +//! to a *backup* thread immediately. +//! +//! **Warning**: These adapters may create a large number of temporary tasks, +//! especially when reading large files. When performing a lot of operations +//! in one batch, it may be significantly faster to use [`spawn_blocking`] +//! directly: +//! +//! ``` +//! use tokio::fs::File; +//! use std::io::{BufReader, BufRead}; +//! async fn count_lines(file: File) -> Result<usize, std::io::Error> { +//! let file = file.into_std().await; +//! tokio::task::spawn_blocking(move || { +//! let line_count = BufReader::new(file).lines().count(); +//! Ok(line_count) +//! }).await? +//! } +//! ``` +//! +//! [`spawn_blocking`]: fn@crate::task::spawn_blocking +//! [`AsyncRead`]: trait@crate::io::AsyncRead + +mod canonicalize; +pub use self::canonicalize::canonicalize; + +mod create_dir; +pub use self::create_dir::create_dir; + +mod create_dir_all; +pub use self::create_dir_all::create_dir_all; + +mod dir_builder; +pub use self::dir_builder::DirBuilder; + +mod file; +pub use self::file::File; + +mod hard_link; +pub use self::hard_link::hard_link; + +mod metadata; +pub use self::metadata::metadata; + +mod open_options; +pub use self::open_options::OpenOptions; + +mod read; +pub use self::read::read; + +mod read_dir; +pub use self::read_dir::{read_dir, DirEntry, ReadDir}; + +mod read_link; +pub use self::read_link::read_link; + +mod read_to_string; +pub use self::read_to_string::read_to_string; + +mod remove_dir; +pub use self::remove_dir::remove_dir; + +mod remove_dir_all; +pub use self::remove_dir_all::remove_dir_all; + +mod remove_file; +pub use self::remove_file::remove_file; + +mod rename; +pub use self::rename::rename; + +mod set_permissions; +pub use self::set_permissions::set_permissions; + +mod symlink_metadata; +pub use self::symlink_metadata::symlink_metadata; + +mod write; +pub use self::write::write; + +mod copy; +pub use self::copy::copy; + +mod try_exists; +pub use self::try_exists::try_exists; + +#[cfg(test)] +mod mocks; + +feature! { + #![unix] + + mod symlink; + pub use self::symlink::symlink; +} + +cfg_windows! { + mod symlink_dir; + pub use self::symlink_dir::symlink_dir; + + mod symlink_file; + pub use self::symlink_file::symlink_file; +} + +use std::io; + +#[cfg(not(test))] +use crate::blocking::spawn_blocking; +#[cfg(test)] +use mocks::spawn_blocking; + +pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T> +where + F: FnOnce() -> io::Result<T> + Send + 'static, + T: Send + 'static, +{ + match spawn_blocking(f).await { + Ok(res) => res, + Err(_) => Err(io::Error::new( + io::ErrorKind::Other, + "background task failed", + )), + } +} diff --git a/third_party/rust/tokio/src/fs/open_options.rs b/third_party/rust/tokio/src/fs/open_options.rs new file mode 100644 index 0000000000..103510250b --- /dev/null +++ b/third_party/rust/tokio/src/fs/open_options.rs @@ -0,0 +1,664 @@ +use crate::fs::{asyncify, File}; + +use std::io; +use std::path::Path; + +#[cfg(test)] +mod mock_open_options; +#[cfg(test)] +use mock_open_options::MockOpenOptions as StdOpenOptions; +#[cfg(not(test))] +use std::fs::OpenOptions as StdOpenOptions; + +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; +#[cfg(windows)] +use std::os::windows::fs::OpenOptionsExt; + +/// Options and flags which can be used to configure how a file is opened. +/// +/// This builder exposes the ability to configure how a [`File`] is opened and +/// what operations are permitted on the open file. The [`File::open`] and +/// [`File::create`] methods are aliases for commonly used options using this +/// builder. +/// +/// Generally speaking, when using `OpenOptions`, you'll first call [`new`], +/// then chain calls to methods to set each option, then call [`open`], passing +/// the path of the file you're trying to open. This will give you a +/// [`io::Result`][result] with a [`File`] inside that you can further operate +/// on. +/// +/// This is a specialized version of [`std::fs::OpenOptions`] for usage from +/// the Tokio runtime. +/// +/// `From<std::fs::OpenOptions>` is implemented for more advanced configuration +/// than the methods provided here. +/// +/// [`new`]: OpenOptions::new +/// [`open`]: OpenOptions::open +/// [result]: std::io::Result +/// [`File`]: File +/// [`File::open`]: File::open +/// [`File::create`]: File::create +/// [`std::fs::OpenOptions`]: std::fs::OpenOptions +/// +/// # Examples +/// +/// Opening a file to read: +/// +/// ```no_run +/// use tokio::fs::OpenOptions; +/// use std::io; +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let file = OpenOptions::new() +/// .read(true) +/// .open("foo.txt") +/// .await?; +/// +/// Ok(()) +/// } +/// ``` +/// +/// Opening a file for both reading and writing, as well as creating it if it +/// doesn't exist: +/// +/// ```no_run +/// use tokio::fs::OpenOptions; +/// use std::io; +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let file = OpenOptions::new() +/// .read(true) +/// .write(true) +/// .create(true) +/// .open("foo.txt") +/// .await?; +/// +/// Ok(()) +/// } +/// ``` +#[derive(Clone, Debug)] +pub struct OpenOptions(StdOpenOptions); + +impl OpenOptions { + /// Creates a blank new set of options ready for configuration. + /// + /// All options are initially set to `false`. + /// + /// This is an async version of [`std::fs::OpenOptions::new`][std] + /// + /// [std]: std::fs::OpenOptions::new + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// + /// let mut options = OpenOptions::new(); + /// let future = options.read(true).open("foo.txt"); + /// ``` + pub fn new() -> OpenOptions { + OpenOptions(StdOpenOptions::new()) + } + + /// Sets the option for read access. + /// + /// This option, when true, will indicate that the file should be + /// `read`-able if opened. + /// + /// This is an async version of [`std::fs::OpenOptions::read`][std] + /// + /// [std]: std::fs::OpenOptions::read + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new() + /// .read(true) + /// .open("foo.txt") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn read(&mut self, read: bool) -> &mut OpenOptions { + self.0.read(read); + self + } + + /// Sets the option for write access. + /// + /// This option, when true, will indicate that the file should be + /// `write`-able if opened. + /// + /// This is an async version of [`std::fs::OpenOptions::write`][std] + /// + /// [std]: std::fs::OpenOptions::write + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new() + /// .write(true) + /// .open("foo.txt") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn write(&mut self, write: bool) -> &mut OpenOptions { + self.0.write(write); + self + } + + /// Sets the option for the append mode. + /// + /// This option, when true, means that writes will append to a file instead + /// of overwriting previous contents. Note that setting + /// `.write(true).append(true)` has the same effect as setting only + /// `.append(true)`. + /// + /// For most filesystems, the operating system guarantees that all writes are + /// atomic: no writes get mangled because another process writes at the same + /// time. + /// + /// One maybe obvious note when using append-mode: make sure that all data + /// that belongs together is written to the file in one operation. This + /// can be done by concatenating strings before passing them to [`write()`], + /// or using a buffered writer (with a buffer of adequate size), + /// and calling [`flush()`] when the message is complete. + /// + /// If a file is opened with both read and append access, beware that after + /// opening, and after every write, the position for reading may be set at the + /// end of the file. So, before writing, save the current position (using + /// [`seek`]`(`[`SeekFrom`]`::`[`Current`]`(0))`), and restore it before the next read. + /// + /// This is an async version of [`std::fs::OpenOptions::append`][std] + /// + /// [std]: std::fs::OpenOptions::append + /// + /// ## Note + /// + /// This function doesn't create the file if it doesn't exist. Use the [`create`] + /// method to do so. + /// + /// [`write()`]: crate::io::AsyncWriteExt::write + /// [`flush()`]: crate::io::AsyncWriteExt::flush + /// [`seek`]: crate::io::AsyncSeekExt::seek + /// [`SeekFrom`]: std::io::SeekFrom + /// [`Current`]: std::io::SeekFrom::Current + /// [`create`]: OpenOptions::create + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new() + /// .append(true) + /// .open("foo.txt") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn append(&mut self, append: bool) -> &mut OpenOptions { + self.0.append(append); + self + } + + /// Sets the option for truncating a previous file. + /// + /// If a file is successfully opened with this option set it will truncate + /// the file to 0 length if it already exists. + /// + /// The file must be opened with write access for truncate to work. + /// + /// This is an async version of [`std::fs::OpenOptions::truncate`][std] + /// + /// [std]: std::fs::OpenOptions::truncate + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new() + /// .write(true) + /// .truncate(true) + /// .open("foo.txt") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions { + self.0.truncate(truncate); + self + } + + /// Sets the option for creating a new file. + /// + /// This option indicates whether a new file will be created if the file + /// does not yet already exist. + /// + /// In order for the file to be created, [`write`] or [`append`] access must + /// be used. + /// + /// This is an async version of [`std::fs::OpenOptions::create`][std] + /// + /// [std]: std::fs::OpenOptions::create + /// [`write`]: OpenOptions::write + /// [`append`]: OpenOptions::append + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new() + /// .write(true) + /// .create(true) + /// .open("foo.txt") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn create(&mut self, create: bool) -> &mut OpenOptions { + self.0.create(create); + self + } + + /// Sets the option to always create a new file. + /// + /// This option indicates whether a new file will be created. No file is + /// allowed to exist at the target location, also no (dangling) symlink. + /// + /// This option is useful because it is atomic. Otherwise between checking + /// whether a file exists and creating a new one, the file may have been + /// created by another process (a TOCTOU race condition / attack). + /// + /// If `.create_new(true)` is set, [`.create()`] and [`.truncate()`] are + /// ignored. + /// + /// The file must be opened with write or append access in order to create a + /// new file. + /// + /// This is an async version of [`std::fs::OpenOptions::create_new`][std] + /// + /// [std]: std::fs::OpenOptions::create_new + /// [`.create()`]: OpenOptions::create + /// [`.truncate()`]: OpenOptions::truncate + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new() + /// .write(true) + /// .create_new(true) + /// .open("foo.txt") + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions { + self.0.create_new(create_new); + self + } + + /// Opens a file at `path` with the options specified by `self`. + /// + /// This is an async version of [`std::fs::OpenOptions::open`][std] + /// + /// [std]: std::fs::OpenOptions::open + /// + /// # Errors + /// + /// This function will return an error under a number of different + /// circumstances. Some of these error conditions are listed here, together + /// with their [`ErrorKind`]. The mapping to [`ErrorKind`]s is not part of + /// the compatibility contract of the function, especially the `Other` kind + /// might change to more specific kinds in the future. + /// + /// * [`NotFound`]: The specified file does not exist and neither `create` + /// or `create_new` is set. + /// * [`NotFound`]: One of the directory components of the file path does + /// not exist. + /// * [`PermissionDenied`]: The user lacks permission to get the specified + /// access rights for the file. + /// * [`PermissionDenied`]: The user lacks permission to open one of the + /// directory components of the specified path. + /// * [`AlreadyExists`]: `create_new` was specified and the file already + /// exists. + /// * [`InvalidInput`]: Invalid combinations of open options (truncate + /// without write access, no access mode set, etc.). + /// * [`Other`]: One of the directory components of the specified file path + /// was not, in fact, a directory. + /// * [`Other`]: Filesystem-level errors: full disk, write permission + /// requested on a read-only file system, exceeded disk quota, too many + /// open files, too long filename, too many symbolic links in the + /// specified path (Unix-like systems only), etc. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let file = OpenOptions::new().open("foo.txt").await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`ErrorKind`]: std::io::ErrorKind + /// [`AlreadyExists`]: std::io::ErrorKind::AlreadyExists + /// [`InvalidInput`]: std::io::ErrorKind::InvalidInput + /// [`NotFound`]: std::io::ErrorKind::NotFound + /// [`Other`]: std::io::ErrorKind::Other + /// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied + pub async fn open(&self, path: impl AsRef<Path>) -> io::Result<File> { + let path = path.as_ref().to_owned(); + let opts = self.0.clone(); + + let std = asyncify(move || opts.open(path)).await?; + Ok(File::from_std(std)) + } + + /// Returns a mutable reference to the underlying `std::fs::OpenOptions` + pub(super) fn as_inner_mut(&mut self) -> &mut StdOpenOptions { + &mut self.0 + } +} + +feature! { + #![unix] + + impl OpenOptions { + /// Sets the mode bits that a new file will be created with. + /// + /// If a new file is created as part of an `OpenOptions::open` call then this + /// specified `mode` will be used as the permission bits for the new file. + /// If no `mode` is set, the default of `0o666` will be used. + /// The operating system masks out bits with the system's `umask`, to produce + /// the final permissions. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut options = OpenOptions::new(); + /// options.mode(0o644); // Give read/write for owner and read for others. + /// let file = options.open("foo.txt").await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn mode(&mut self, mode: u32) -> &mut OpenOptions { + self.as_inner_mut().mode(mode); + self + } + + /// Passes custom flags to the `flags` argument of `open`. + /// + /// The bits that define the access mode are masked out with `O_ACCMODE`, to + /// ensure they do not interfere with the access mode set by Rusts options. + /// + /// Custom flags can only set flags, not remove flags set by Rusts options. + /// This options overwrites any previously set custom flags. + /// + /// # Examples + /// + /// ```no_run + /// use libc; + /// use tokio::fs::OpenOptions; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut options = OpenOptions::new(); + /// options.write(true); + /// if cfg!(unix) { + /// options.custom_flags(libc::O_NOFOLLOW); + /// } + /// let file = options.open("foo.txt").await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions { + self.as_inner_mut().custom_flags(flags); + self + } + } +} + +cfg_windows! { + impl OpenOptions { + /// Overrides the `dwDesiredAccess` argument to the call to [`CreateFile`] + /// with the specified value. + /// + /// This will override the `read`, `write`, and `append` flags on the + /// `OpenOptions` structure. This method provides fine-grained control over + /// the permissions to read, write and append data, attributes (like hidden + /// and system), and extended attributes. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// // Open without read and write permission, for example if you only need + /// // to call `stat` on the file + /// let file = OpenOptions::new().access_mode(0).open("foo.txt").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea + pub fn access_mode(&mut self, access: u32) -> &mut OpenOptions { + self.as_inner_mut().access_mode(access); + self + } + + /// Overrides the `dwShareMode` argument to the call to [`CreateFile`] with + /// the specified value. + /// + /// By default `share_mode` is set to + /// `FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE`. This allows + /// other processes to read, write, and delete/rename the same file + /// while it is open. Removing any of the flags will prevent other + /// processes from performing the corresponding operation until the file + /// handle is closed. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::OpenOptions; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// // Do not allow others to read or modify this file while we have it open + /// // for writing. + /// let file = OpenOptions::new() + /// .write(true) + /// .share_mode(0) + /// .open("foo.txt").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea + pub fn share_mode(&mut self, share: u32) -> &mut OpenOptions { + self.as_inner_mut().share_mode(share); + self + } + + /// Sets extra flags for the `dwFileFlags` argument to the call to + /// [`CreateFile2`] to the specified value (or combines it with + /// `attributes` and `security_qos_flags` to set the `dwFlagsAndAttributes` + /// for [`CreateFile`]). + /// + /// Custom flags can only set flags, not remove flags set by Rust's options. + /// This option overwrites any previously set custom flags. + /// + /// # Examples + /// + /// ```no_run + /// use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_DELETE_ON_CLOSE; + /// use tokio::fs::OpenOptions; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// let file = OpenOptions::new() + /// .create(true) + /// .write(true) + /// .custom_flags(FILE_FLAG_DELETE_ON_CLOSE) + /// .open("foo.txt").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea + /// [`CreateFile2`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfile2 + pub fn custom_flags(&mut self, flags: u32) -> &mut OpenOptions { + self.as_inner_mut().custom_flags(flags); + self + } + + /// Sets the `dwFileAttributes` argument to the call to [`CreateFile2`] to + /// the specified value (or combines it with `custom_flags` and + /// `security_qos_flags` to set the `dwFlagsAndAttributes` for + /// [`CreateFile`]). + /// + /// If a _new_ file is created because it does not yet exist and + /// `.create(true)` or `.create_new(true)` are specified, the new file is + /// given the attributes declared with `.attributes()`. + /// + /// If an _existing_ file is opened with `.create(true).truncate(true)`, its + /// existing attributes are preserved and combined with the ones declared + /// with `.attributes()`. + /// + /// In all other cases the attributes get ignored. + /// + /// # Examples + /// + /// ```no_run + /// use windows_sys::Win32::Storage::FileSystem::FILE_ATTRIBUTE_HIDDEN; + /// use tokio::fs::OpenOptions; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// let file = OpenOptions::new() + /// .write(true) + /// .create(true) + /// .attributes(FILE_ATTRIBUTE_HIDDEN) + /// .open("foo.txt").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea + /// [`CreateFile2`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfile2 + pub fn attributes(&mut self, attributes: u32) -> &mut OpenOptions { + self.as_inner_mut().attributes(attributes); + self + } + + /// Sets the `dwSecurityQosFlags` argument to the call to [`CreateFile2`] to + /// the specified value (or combines it with `custom_flags` and `attributes` + /// to set the `dwFlagsAndAttributes` for [`CreateFile`]). + /// + /// By default `security_qos_flags` is not set. It should be specified when + /// opening a named pipe, to control to which degree a server process can + /// act on behalf of a client process (security impersonation level). + /// + /// When `security_qos_flags` is not set, a malicious program can gain the + /// elevated privileges of a privileged Rust process when it allows opening + /// user-specified paths, by tricking it into opening a named pipe. So + /// arguably `security_qos_flags` should also be set when opening arbitrary + /// paths. However the bits can then conflict with other flags, specifically + /// `FILE_FLAG_OPEN_NO_RECALL`. + /// + /// For information about possible values, see [Impersonation Levels] on the + /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set + /// automatically when using this method. + /// + /// # Examples + /// + /// ```no_run + /// use windows_sys::Win32::Storage::FileSystem::SECURITY_IDENTIFICATION; + /// use tokio::fs::OpenOptions; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// let file = OpenOptions::new() + /// .write(true) + /// .create(true) + /// + /// // Sets the flag value to `SecurityIdentification`. + /// .security_qos_flags(SECURITY_IDENTIFICATION) + /// + /// .open(r"\\.\pipe\MyPipe").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea + /// [`CreateFile2`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfile2 + /// [Impersonation Levels]: + /// https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level + pub fn security_qos_flags(&mut self, flags: u32) -> &mut OpenOptions { + self.as_inner_mut().security_qos_flags(flags); + self + } + } +} + +impl From<StdOpenOptions> for OpenOptions { + fn from(options: StdOpenOptions) -> OpenOptions { + OpenOptions(options) + } +} + +impl Default for OpenOptions { + fn default() -> Self { + Self::new() + } +} diff --git a/third_party/rust/tokio/src/fs/open_options/mock_open_options.rs b/third_party/rust/tokio/src/fs/open_options/mock_open_options.rs new file mode 100644 index 0000000000..17b4a48640 --- /dev/null +++ b/third_party/rust/tokio/src/fs/open_options/mock_open_options.rs @@ -0,0 +1,39 @@ +#![allow(unreachable_pub)] +//! Mock version of std::fs::OpenOptions; +use mockall::mock; + +use crate::fs::mocks::MockFile; +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; +#[cfg(windows)] +use std::os::windows::fs::OpenOptionsExt; +use std::{io, path::Path}; + +mock! { + #[derive(Debug)] + pub OpenOptions { + pub fn append(&mut self, append: bool) -> &mut Self; + pub fn create(&mut self, create: bool) -> &mut Self; + pub fn create_new(&mut self, create_new: bool) -> &mut Self; + pub fn open<P: AsRef<Path> + 'static>(&self, path: P) -> io::Result<MockFile>; + pub fn read(&mut self, read: bool) -> &mut Self; + pub fn truncate(&mut self, truncate: bool) -> &mut Self; + pub fn write(&mut self, write: bool) -> &mut Self; + } + impl Clone for OpenOptions { + fn clone(&self) -> Self; + } + #[cfg(unix)] + impl OpenOptionsExt for OpenOptions { + fn custom_flags(&mut self, flags: i32) -> &mut Self; + fn mode(&mut self, mode: u32) -> &mut Self; + } + #[cfg(windows)] + impl OpenOptionsExt for OpenOptions { + fn access_mode(&mut self, access: u32) -> &mut Self; + fn share_mode(&mut self, val: u32) -> &mut Self; + fn custom_flags(&mut self, flags: u32) -> &mut Self; + fn attributes(&mut self, val: u32) -> &mut Self; + fn security_qos_flags(&mut self, flags: u32) -> &mut Self; + } +} diff --git a/third_party/rust/tokio/src/fs/read.rs b/third_party/rust/tokio/src/fs/read.rs new file mode 100644 index 0000000000..ada5ba391b --- /dev/null +++ b/third_party/rust/tokio/src/fs/read.rs @@ -0,0 +1,51 @@ +use crate::fs::asyncify; + +use std::{io, path::Path}; + +/// Reads the entire contents of a file into a bytes vector. +/// +/// This is an async version of [`std::fs::read`][std] +/// +/// [std]: std::fs::read +/// +/// This is a convenience function for using [`File::open`] and [`read_to_end`] +/// with fewer imports and without an intermediate variable. It pre-allocates a +/// buffer based on the file size when available, so it is generally faster than +/// reading into a vector created with `Vec::new()`. +/// +/// This operation is implemented by running the equivalent blocking operation +/// on a separate thread pool using [`spawn_blocking`]. +/// +/// [`File::open`]: super::File::open +/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end +/// [`spawn_blocking`]: crate::task::spawn_blocking +/// +/// # Errors +/// +/// This function will return an error if `path` does not already exist. +/// Other errors may also be returned according to [`OpenOptions::open`]. +/// +/// [`OpenOptions::open`]: super::OpenOptions::open +/// +/// It will also return an error if it encounters while reading an error +/// of a kind other than [`ErrorKind::Interrupted`]. +/// +/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// use std::net::SocketAddr; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> { +/// let contents = fs::read("address.txt").await?; +/// let foo: SocketAddr = String::from_utf8_lossy(&contents).parse()?; +/// Ok(()) +/// } +/// ``` +pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::read(path)).await +} diff --git a/third_party/rust/tokio/src/fs/read_dir.rs b/third_party/rust/tokio/src/fs/read_dir.rs new file mode 100644 index 0000000000..def735b3c0 --- /dev/null +++ b/third_party/rust/tokio/src/fs/read_dir.rs @@ -0,0 +1,355 @@ +use crate::fs::asyncify; + +use std::collections::VecDeque; +use std::ffi::OsString; +use std::fs::{FileType, Metadata}; +use std::future::Future; +use std::io; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +#[cfg(test)] +use super::mocks::spawn_blocking; +#[cfg(test)] +use super::mocks::JoinHandle; +#[cfg(not(test))] +use crate::blocking::spawn_blocking; +#[cfg(not(test))] +use crate::blocking::JoinHandle; + +const CHUNK_SIZE: usize = 32; + +/// Returns a stream over the entries within a directory. +/// +/// This is an async version of [`std::fs::read_dir`](std::fs::read_dir) +/// +/// This operation is implemented by running the equivalent blocking +/// operation on a separate thread pool using [`spawn_blocking`]. +/// +/// [`spawn_blocking`]: crate::task::spawn_blocking +pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> { + let path = path.as_ref().to_owned(); + asyncify(|| -> io::Result<ReadDir> { + let mut std = std::fs::read_dir(path)?; + let mut buf = VecDeque::with_capacity(CHUNK_SIZE); + let remain = ReadDir::next_chunk(&mut buf, &mut std); + + Ok(ReadDir(State::Idle(Some((buf, std, remain))))) + }) + .await +} + +/// Reads the entries in a directory. +/// +/// This struct is returned from the [`read_dir`] function of this module and +/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information +/// like the entry's path and possibly other metadata can be learned. +/// +/// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`]. +/// +/// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html +/// +/// # Errors +/// +/// This stream will return an [`Err`] if there's some sort of intermittent +/// IO error during iteration. +/// +/// [`read_dir`]: read_dir +/// [`DirEntry`]: DirEntry +/// [`Err`]: std::result::Result::Err +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct ReadDir(State); + +#[derive(Debug)] +enum State { + Idle(Option<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>), + Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>), +} + +impl ReadDir { + /// Returns the next entry in the directory stream. + /// + /// # Cancel safety + /// + /// This method is cancellation safe. + pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_next_entry(cx)).await + } + + /// Polls for the next directory entry in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next directory entry is not yet available. + /// * `Poll::Ready(Ok(Some(entry)))` if the next directory entry is available. + /// * `Poll::Ready(Ok(None))` if there are no more directory entries in this + /// stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next + /// directory entry. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when the next directory entry + /// becomes available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_entry`, only the `Waker` from + /// the `Context` passed to the most recent call is scheduled to receive a + /// wakeup. + pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> { + loop { + match self.0 { + State::Idle(ref mut data) => { + let (buf, _, ref remain) = data.as_mut().unwrap(); + + if let Some(ent) = buf.pop_front() { + return Poll::Ready(ent.map(Some)); + } else if !remain { + return Poll::Ready(Ok(None)); + } + + let (mut buf, mut std, _) = data.take().unwrap(); + + self.0 = State::Pending(spawn_blocking(move || { + let remain = ReadDir::next_chunk(&mut buf, &mut std); + (buf, std, remain) + })); + } + State::Pending(ref mut rx) => { + self.0 = State::Idle(Some(ready!(Pin::new(rx).poll(cx))?)); + } + } + } + } + + fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut std::fs::ReadDir) -> bool { + for _ in 0..CHUNK_SIZE { + let ret = match std.next() { + Some(ret) => ret, + None => return false, + }; + + let success = ret.is_ok(); + + buf.push_back(ret.map(|std| DirEntry { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks", + target_os = "nto", + target_os = "vita", + )))] + file_type: std.file_type().ok(), + std: Arc::new(std), + })); + + if !success { + break; + } + } + + true + } +} + +feature! { + #![unix] + + use std::os::unix::fs::DirEntryExt; + + impl DirEntry { + /// Returns the underlying `d_ino` field in the contained `dirent` + /// structure. + /// + /// # Examples + /// + /// ``` + /// use tokio::fs; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// let mut entries = fs::read_dir(".").await?; + /// while let Some(entry) = entries.next_entry().await? { + /// // Here, `entry` is a `DirEntry`. + /// println!("{:?}: {}", entry.file_name(), entry.ino()); + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn ino(&self) -> u64 { + self.as_inner().ino() + } + } +} + +/// Entries returned by the [`ReadDir`] stream. +/// +/// [`ReadDir`]: struct@ReadDir +/// +/// This is a specialized version of [`std::fs::DirEntry`] for usage from the +/// Tokio runtime. +/// +/// An instance of `DirEntry` represents an entry inside of a directory on the +/// filesystem. Each entry can be inspected via methods to learn about the full +/// path or possibly other metadata through per-platform extension traits. +#[derive(Debug)] +pub struct DirEntry { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks", + target_os = "nto", + target_os = "vita", + )))] + file_type: Option<FileType>, + std: Arc<std::fs::DirEntry>, +} + +impl DirEntry { + /// Returns the full path to the file that this entry represents. + /// + /// The full path is created by joining the original path to `read_dir` + /// with the filename of this entry. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut entries = fs::read_dir(".").await?; + /// + /// while let Some(entry) = entries.next_entry().await? { + /// println!("{:?}", entry.path()); + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// This prints output like: + /// + /// ```text + /// "./whatever.txt" + /// "./foo.html" + /// "./hello_world.rs" + /// ``` + /// + /// The exact text, of course, depends on what files you have in `.`. + pub fn path(&self) -> PathBuf { + self.std.path() + } + + /// Returns the bare file name of this directory entry without any other + /// leading path component. + /// + /// # Examples + /// + /// ``` + /// use tokio::fs; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut entries = fs::read_dir(".").await?; + /// + /// while let Some(entry) = entries.next_entry().await? { + /// println!("{:?}", entry.file_name()); + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn file_name(&self) -> OsString { + self.std.file_name() + } + + /// Returns the metadata for the file that this entry points at. + /// + /// This function will not traverse symlinks if this entry points at a + /// symlink. + /// + /// # Platform-specific behavior + /// + /// On Windows this function is cheap to call (no extra system calls + /// needed), but on Unix platforms this function is the equivalent of + /// calling `symlink_metadata` on the path. + /// + /// # Examples + /// + /// ``` + /// use tokio::fs; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut entries = fs::read_dir(".").await?; + /// + /// while let Some(entry) = entries.next_entry().await? { + /// if let Ok(metadata) = entry.metadata().await { + /// // Now let's show our entry's permissions! + /// println!("{:?}: {:?}", entry.path(), metadata.permissions()); + /// } else { + /// println!("Couldn't get file type for {:?}", entry.path()); + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn metadata(&self) -> io::Result<Metadata> { + let std = self.std.clone(); + asyncify(move || std.metadata()).await + } + + /// Returns the file type for the file that this entry points at. + /// + /// This function will not traverse symlinks if this entry points at a + /// symlink. + /// + /// # Platform-specific behavior + /// + /// On Windows and most Unix platforms this function is free (no extra + /// system calls needed), but some Unix platforms may require the equivalent + /// call to `symlink_metadata` to learn about the target file type. + /// + /// # Examples + /// + /// ``` + /// use tokio::fs; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut entries = fs::read_dir(".").await?; + /// + /// while let Some(entry) = entries.next_entry().await? { + /// if let Ok(file_type) = entry.file_type().await { + /// // Now let's show our entry's file type! + /// println!("{:?}: {:?}", entry.path(), file_type); + /// } else { + /// println!("Couldn't get file type for {:?}", entry.path()); + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn file_type(&self) -> io::Result<FileType> { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks", + target_os = "nto", + target_os = "vita", + )))] + if let Some(file_type) = self.file_type { + return Ok(file_type); + } + + let std = self.std.clone(); + asyncify(move || std.file_type()).await + } + + /// Returns a reference to the underlying `std::fs::DirEntry`. + #[cfg(unix)] + pub(super) fn as_inner(&self) -> &std::fs::DirEntry { + &self.std + } +} diff --git a/third_party/rust/tokio/src/fs/read_link.rs b/third_party/rust/tokio/src/fs/read_link.rs new file mode 100644 index 0000000000..6c48c5e156 --- /dev/null +++ b/third_party/rust/tokio/src/fs/read_link.rs @@ -0,0 +1,14 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::{Path, PathBuf}; + +/// Reads a symbolic link, returning the file that the link points to. +/// +/// This is an async version of [`std::fs::read_link`][std] +/// +/// [std]: std::fs::read_link +pub async fn read_link(path: impl AsRef<Path>) -> io::Result<PathBuf> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::read_link(path)).await +} diff --git a/third_party/rust/tokio/src/fs/read_to_string.rs b/third_party/rust/tokio/src/fs/read_to_string.rs new file mode 100644 index 0000000000..26228d98c2 --- /dev/null +++ b/third_party/rust/tokio/src/fs/read_to_string.rs @@ -0,0 +1,30 @@ +use crate::fs::asyncify; + +use std::{io, path::Path}; + +/// Creates a future which will open a file for reading and read the entire +/// contents into a string and return said string. +/// +/// This is the async equivalent of [`std::fs::read_to_string`][std]. +/// +/// This operation is implemented by running the equivalent blocking operation +/// on a separate thread pool using [`spawn_blocking`]. +/// +/// [`spawn_blocking`]: crate::task::spawn_blocking +/// [std]: fn@std::fs::read_to_string +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// +/// # async fn dox() -> std::io::Result<()> { +/// let contents = fs::read_to_string("foo.txt").await?; +/// println!("foo.txt contains {} bytes", contents.len()); +/// # Ok(()) +/// # } +/// ``` +pub async fn read_to_string(path: impl AsRef<Path>) -> io::Result<String> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::read_to_string(path)).await +} diff --git a/third_party/rust/tokio/src/fs/remove_dir.rs b/third_party/rust/tokio/src/fs/remove_dir.rs new file mode 100644 index 0000000000..6e7cbd08f6 --- /dev/null +++ b/third_party/rust/tokio/src/fs/remove_dir.rs @@ -0,0 +1,12 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Removes an existing, empty directory. +/// +/// This is an async version of [`std::fs::remove_dir`](std::fs::remove_dir) +pub async fn remove_dir(path: impl AsRef<Path>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::remove_dir(path)).await +} diff --git a/third_party/rust/tokio/src/fs/remove_dir_all.rs b/third_party/rust/tokio/src/fs/remove_dir_all.rs new file mode 100644 index 0000000000..0a237550f9 --- /dev/null +++ b/third_party/rust/tokio/src/fs/remove_dir_all.rs @@ -0,0 +1,14 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Removes a directory at this path, after removing all its contents. Use carefully! +/// +/// This is an async version of [`std::fs::remove_dir_all`][std] +/// +/// [std]: fn@std::fs::remove_dir_all +pub async fn remove_dir_all(path: impl AsRef<Path>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::remove_dir_all(path)).await +} diff --git a/third_party/rust/tokio/src/fs/remove_file.rs b/third_party/rust/tokio/src/fs/remove_file.rs new file mode 100644 index 0000000000..d22a5bfc88 --- /dev/null +++ b/third_party/rust/tokio/src/fs/remove_file.rs @@ -0,0 +1,18 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Removes a file from the filesystem. +/// +/// Note that there is no guarantee that the file is immediately deleted (e.g. +/// depending on platform, other open file descriptors may prevent immediate +/// removal). +/// +/// This is an async version of [`std::fs::remove_file`][std] +/// +/// [std]: std::fs::remove_file +pub async fn remove_file(path: impl AsRef<Path>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::remove_file(path)).await +} diff --git a/third_party/rust/tokio/src/fs/rename.rs b/third_party/rust/tokio/src/fs/rename.rs new file mode 100644 index 0000000000..4f980821d2 --- /dev/null +++ b/third_party/rust/tokio/src/fs/rename.rs @@ -0,0 +1,17 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Renames a file or directory to a new name, replacing the original file if +/// `to` already exists. +/// +/// This will not work if the new name is on a different mount point. +/// +/// This is an async version of [`std::fs::rename`](std::fs::rename) +pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> { + let from = from.as_ref().to_owned(); + let to = to.as_ref().to_owned(); + + asyncify(move || std::fs::rename(from, to)).await +} diff --git a/third_party/rust/tokio/src/fs/set_permissions.rs b/third_party/rust/tokio/src/fs/set_permissions.rs new file mode 100644 index 0000000000..09be02ea01 --- /dev/null +++ b/third_party/rust/tokio/src/fs/set_permissions.rs @@ -0,0 +1,15 @@ +use crate::fs::asyncify; + +use std::fs::Permissions; +use std::io; +use std::path::Path; + +/// Changes the permissions found on a file or a directory. +/// +/// This is an async version of [`std::fs::set_permissions`][std] +/// +/// [std]: fn@std::fs::set_permissions +pub async fn set_permissions(path: impl AsRef<Path>, perm: Permissions) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(|| std::fs::set_permissions(path, perm)).await +} diff --git a/third_party/rust/tokio/src/fs/symlink.rs b/third_party/rust/tokio/src/fs/symlink.rs new file mode 100644 index 0000000000..22ece7250f --- /dev/null +++ b/third_party/rust/tokio/src/fs/symlink.rs @@ -0,0 +1,18 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Creates a new symbolic link on the filesystem. +/// +/// The `dst` path will be a symbolic link pointing to the `src` path. +/// +/// This is an async version of [`std::os::unix::fs::symlink`][std] +/// +/// [std]: std::os::unix::fs::symlink +pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> { + let src = src.as_ref().to_owned(); + let dst = dst.as_ref().to_owned(); + + asyncify(move || std::os::unix::fs::symlink(src, dst)).await +} diff --git a/third_party/rust/tokio/src/fs/symlink_dir.rs b/third_party/rust/tokio/src/fs/symlink_dir.rs new file mode 100644 index 0000000000..6753c25eb7 --- /dev/null +++ b/third_party/rust/tokio/src/fs/symlink_dir.rs @@ -0,0 +1,19 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Creates a new directory symlink on the filesystem. +/// +/// The `dst` path will be a directory symbolic link pointing to the `src` +/// path. +/// +/// This is an async version of [`std::os::windows::fs::symlink_dir`][std] +/// +/// [std]: https://doc.rust-lang.org/std/os/windows/fs/fn.symlink_dir.html +pub async fn symlink_dir(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> { + let src = src.as_ref().to_owned(); + let dst = dst.as_ref().to_owned(); + + asyncify(move || std::os::windows::fs::symlink_dir(src, dst)).await +} diff --git a/third_party/rust/tokio/src/fs/symlink_file.rs b/third_party/rust/tokio/src/fs/symlink_file.rs new file mode 100644 index 0000000000..623352a1bd --- /dev/null +++ b/third_party/rust/tokio/src/fs/symlink_file.rs @@ -0,0 +1,19 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Creates a new file symbolic link on the filesystem. +/// +/// The `dst` path will be a file symbolic link pointing to the `src` +/// path. +/// +/// This is an async version of [`std::os::windows::fs::symlink_file`][std] +/// +/// [std]: https://doc.rust-lang.org/std/os/windows/fs/fn.symlink_file.html +pub async fn symlink_file(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> { + let src = src.as_ref().to_owned(); + let dst = dst.as_ref().to_owned(); + + asyncify(move || std::os::windows::fs::symlink_file(src, dst)).await +} diff --git a/third_party/rust/tokio/src/fs/symlink_metadata.rs b/third_party/rust/tokio/src/fs/symlink_metadata.rs new file mode 100644 index 0000000000..1d0df12576 --- /dev/null +++ b/third_party/rust/tokio/src/fs/symlink_metadata.rs @@ -0,0 +1,15 @@ +use crate::fs::asyncify; + +use std::fs::Metadata; +use std::io; +use std::path::Path; + +/// Queries the file system metadata for a path. +/// +/// This is an async version of [`std::fs::symlink_metadata`][std] +/// +/// [std]: fn@std::fs::symlink_metadata +pub async fn symlink_metadata(path: impl AsRef<Path>) -> io::Result<Metadata> { + let path = path.as_ref().to_owned(); + asyncify(|| std::fs::symlink_metadata(path)).await +} diff --git a/third_party/rust/tokio/src/fs/try_exists.rs b/third_party/rust/tokio/src/fs/try_exists.rs new file mode 100644 index 0000000000..069518bf94 --- /dev/null +++ b/third_party/rust/tokio/src/fs/try_exists.rs @@ -0,0 +1,34 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Returns `Ok(true)` if the path points at an existing entity. +/// +/// This function will traverse symbolic links to query information about the +/// destination file. In case of broken symbolic links this will return `Ok(false)`. +/// +/// This is the async equivalent of [`std::path::Path::try_exists`][std]. +/// +/// [std]: fn@std::path::Path::try_exists +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// +/// # async fn dox() -> std::io::Result<()> { +/// fs::try_exists("foo.txt").await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn try_exists(path: impl AsRef<Path>) -> io::Result<bool> { + let path = path.as_ref().to_owned(); + // std's Path::try_exists is not available for current Rust min supported version. + // Current implementation is based on its internal implementation instead. + match asyncify(move || std::fs::metadata(path)).await { + Ok(_) => Ok(true), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(false), + Err(error) => Err(error), + } +} diff --git a/third_party/rust/tokio/src/fs/write.rs b/third_party/rust/tokio/src/fs/write.rs new file mode 100644 index 0000000000..28606fb363 --- /dev/null +++ b/third_party/rust/tokio/src/fs/write.rs @@ -0,0 +1,31 @@ +use crate::fs::asyncify; + +use std::{io, path::Path}; + +/// Creates a future that will open a file for writing and write the entire +/// contents of `contents` to it. +/// +/// This is the async equivalent of [`std::fs::write`][std]. +/// +/// This operation is implemented by running the equivalent blocking operation +/// on a separate thread pool using [`spawn_blocking`]. +/// +/// [`spawn_blocking`]: crate::task::spawn_blocking +/// [std]: fn@std::fs::write +/// +/// # Examples +/// +/// ```no_run +/// use tokio::fs; +/// +/// # async fn dox() -> std::io::Result<()> { +/// fs::write("foo.txt", b"Hello world!").await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Result<()> { + let path = path.as_ref().to_owned(); + let contents = contents.as_ref().to_owned(); + + asyncify(move || std::fs::write(path, contents)).await +} |