diff options
Diffstat (limited to 'third_party/rust/tokio/src/fs/file.rs')
-rw-r--r-- | third_party/rust/tokio/src/fs/file.rs | 779 |
1 files changed, 779 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/fs/file.rs b/third_party/rust/tokio/src/fs/file.rs new file mode 100644 index 0000000000..2c38e8059f --- /dev/null +++ b/third_party/rust/tokio/src/fs/file.rs @@ -0,0 +1,779 @@ +//! Types for working with [`File`]. +//! +//! [`File`]: File + +use self::State::*; +use crate::fs::asyncify; +use crate::io::blocking::Buf; +use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; +use crate::sync::Mutex; + +use std::fmt; +use std::fs::{Metadata, Permissions}; +use std::future::Future; +use std::io::{self, Seek, SeekFrom}; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::task::Poll::*; + +#[cfg(test)] +use super::mocks::JoinHandle; +#[cfg(test)] +use super::mocks::MockFile as StdFile; +#[cfg(test)] +use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; +#[cfg(not(test))] +use crate::blocking::JoinHandle; +#[cfg(not(test))] +use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; +#[cfg(not(test))] +use std::fs::File as StdFile; + +/// A reference to an open file on the filesystem. +/// +/// This is a specialized version of [`std::fs::File`][std] for usage from the +/// Tokio runtime. +/// +/// An instance of a `File` can be read and/or written depending on what options +/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical +/// cursor that the file contains internally. +/// +/// A file will not be closed immediately when it goes out of scope if there +/// are any IO operations that have not yet completed. To ensure that a file is +/// closed immediately when it is dropped, you should call [`flush`] before +/// dropping it. Note that this does not ensure that the file has been fully +/// written to disk; the operating system might keep the changes around in an +/// in-memory buffer. See the [`sync_all`] method for telling the OS to write +/// the data to disk. +/// +/// Reading and writing to a `File` is usually done using the convenience +/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. +/// +/// [std]: struct@std::fs::File +/// [`AsyncSeek`]: trait@crate::io::AsyncSeek +/// [`flush`]: fn@crate::io::AsyncWriteExt::flush +/// [`sync_all`]: fn@crate::fs::File::sync_all +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// +/// # Examples +/// +/// Create a new file and asynchronously write bytes to it: +/// +/// ```no_run +/// use tokio::fs::File; +/// use tokio::io::AsyncWriteExt; // for write_all() +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut file = File::create("foo.txt").await?; +/// file.write_all(b"hello, world!").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Read the contents of a file into a buffer: +/// +/// ```no_run +/// use tokio::fs::File; +/// use tokio::io::AsyncReadExt; // for read_to_end() +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut file = File::open("foo.txt").await?; +/// +/// let mut contents = vec![]; +/// file.read_to_end(&mut contents).await?; +/// +/// println!("len = {}", contents.len()); +/// # Ok(()) +/// # } +/// ``` +pub struct File { + std: Arc<StdFile>, + inner: Mutex<Inner>, +} + +struct Inner { + state: State, + + /// Errors from writes/flushes are returned in write/flush calls. If a write + /// error is observed while performing a read, it is saved until the next + /// write / flush call. + last_write_err: Option<io::ErrorKind>, + + pos: u64, +} + +#[derive(Debug)] +enum State { + Idle(Option<Buf>), + Busy(JoinHandle<(Operation, Buf)>), +} + +#[derive(Debug)] +enum Operation { + Read(io::Result<usize>), + Write(io::Result<()>), + Seek(io::Result<u64>), +} + +impl File { + /// Attempts to open a file in read-only mode. + /// + /// See [`OpenOptions`] for more details. + /// + /// [`OpenOptions`]: super::OpenOptions + /// + /// # Errors + /// + /// This function will return an error if called from outside of the Tokio + /// runtime or if path does not already exist. Other errors may also be + /// returned according to OpenOptions::open. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncReadExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// + /// let mut contents = vec![]; + /// file.read_to_end(&mut contents).await?; + /// + /// println!("len = {}", contents.len()); + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait. + /// + /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end + /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt + pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { + let path = path.as_ref().to_owned(); + let std = asyncify(|| StdFile::open(path)).await?; + + Ok(File::from_std(std)) + } + + /// Opens a file in write-only mode. + /// + /// This function will create a file if it does not exist, and will truncate + /// it if it does. + /// + /// See [`OpenOptions`] for more details. + /// + /// [`OpenOptions`]: super::OpenOptions + /// + /// # Errors + /// + /// Results in an error if called from outside of the Tokio runtime or if + /// the underlying [`create`] call results in an error. + /// + /// [`create`]: std::fs::File::create + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn create(path: impl AsRef<Path>) -> io::Result<File> { + let path = path.as_ref().to_owned(); + let std_file = asyncify(move || StdFile::create(path)).await?; + Ok(File::from_std(std_file)) + } + + /// Converts a [`std::fs::File`][std] to a [`tokio::fs::File`][file]. + /// + /// [std]: std::fs::File + /// [file]: File + /// + /// # Examples + /// + /// ```no_run + /// // This line could block. It is not recommended to do this on the Tokio + /// // runtime. + /// let std_file = std::fs::File::open("foo.txt").unwrap(); + /// let file = tokio::fs::File::from_std(std_file); + /// ``` + pub fn from_std(std: StdFile) -> File { + File { + std: Arc::new(std), + inner: Mutex::new(Inner { + state: State::Idle(Some(Buf::with_capacity(0))), + last_write_err: None, + pos: 0, + }), + } + } + + /// Attempts to sync all OS-internal metadata to disk. + /// + /// This function will attempt to ensure that all in-core data reaches the + /// filesystem before returning. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.sync_all().await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn sync_all(&self) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; + + let std = self.std.clone(); + asyncify(move || std.sync_all()).await + } + + /// This function is similar to `sync_all`, except that it may not + /// synchronize file metadata to the filesystem. + /// + /// This is intended for use cases that must synchronize content, but don't + /// need the metadata on disk. The goal of this method is to reduce disk + /// operations. + /// + /// Note that some platforms may simply implement this in terms of `sync_all`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.sync_data().await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn sync_data(&self) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; + + let std = self.std.clone(); + asyncify(move || std.sync_data()).await + } + + /// Truncates or extends the underlying file, updating the size of this file to become size. + /// + /// If the size is less than the current file's size, then the file will be + /// shrunk. If it is greater than the current file's size, then the file + /// will be extended to size and have all of the intermediate data filled in + /// with 0s. + /// + /// # Errors + /// + /// This function will return an error if the file is not opened for + /// writing. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.set_len(10).await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + pub async fn set_len(&self, size: u64) -> io::Result<()> { + let mut inner = self.inner.lock().await; + inner.complete_inflight().await; + + let mut buf = match inner.state { + Idle(ref mut buf_cell) => buf_cell.take().unwrap(), + _ => unreachable!(), + }; + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let std = self.std.clone(); + + inner.state = Busy(spawn_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| std.set_len(size)) + } else { + std.set_len(size) + } + .map(|_| 0); // the value is discarded later + + // Return the result as a seek + (Operation::Seek(res), buf) + })); + + let (op, buf) = match inner.state { + Idle(_) => unreachable!(), + Busy(ref mut rx) => rx.await?, + }; + + inner.state = Idle(Some(buf)); + + match op { + Operation::Seek(res) => res.map(|pos| { + inner.pos = pos; + }), + _ => unreachable!(), + } + } + + /// Queries metadata about the underlying file. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let metadata = file.metadata().await?; + /// + /// println!("{:?}", metadata); + /// # Ok(()) + /// # } + /// ``` + pub async fn metadata(&self) -> io::Result<Metadata> { + let std = self.std.clone(); + asyncify(move || std.metadata()).await + } + + /// Creates a new `File` instance that shares the same underlying file handle + /// as the existing `File` instance. Reads, writes, and seeks will affect both + /// File instances simultaneously. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let file_clone = file.try_clone().await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn try_clone(&self) -> io::Result<File> { + let std = self.std.clone(); + let std_file = asyncify(move || std.try_clone()).await?; + Ok(File::from_std(std_file)) + } + + /// Destructures `File` into a [`std::fs::File`][std]. This function is + /// async to allow any in-flight operations to complete. + /// + /// Use `File::try_into_std` to attempt conversion immediately. + /// + /// [std]: std::fs::File + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let tokio_file = File::open("foo.txt").await?; + /// let std_file = tokio_file.into_std().await; + /// # Ok(()) + /// # } + /// ``` + pub async fn into_std(mut self) -> StdFile { + self.inner.get_mut().complete_inflight().await; + Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed") + } + + /// Tries to immediately destructure `File` into a [`std::fs::File`][std]. + /// + /// [std]: std::fs::File + /// + /// # Errors + /// + /// This function will return an error containing the file if some + /// operation is in-flight. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let tokio_file = File::open("foo.txt").await?; + /// let std_file = tokio_file.try_into_std().unwrap(); + /// # Ok(()) + /// # } + /// ``` + pub fn try_into_std(mut self) -> Result<StdFile, Self> { + match Arc::try_unwrap(self.std) { + Ok(file) => Ok(file), + Err(std_file_arc) => { + self.std = std_file_arc; + Err(self) + } + } + } + + /// Changes the permissions on the underlying file. + /// + /// # Platform-specific behavior + /// + /// This function currently corresponds to the `fchmod` function on Unix and + /// the `SetFileInformationByHandle` function on Windows. Note that, this + /// [may change in the future][changes]. + /// + /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior + /// + /// # Errors + /// + /// This function will return an error if the user lacks permission change + /// attributes on the underlying file. It may also return an error in other + /// os-specific unspecified cases. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let mut perms = file.metadata().await?.permissions(); + /// perms.set_readonly(true); + /// file.set_permissions(perms).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { + let std = self.std.clone(); + asyncify(move || std.set_permissions(perm)).await + } +} + +impl AsyncRead for File { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + if !buf.is_empty() { + buf.copy_to(dst); + *buf_cell = Some(buf); + return Ready(Ok(())); + } + + buf.ensure_capacity_for(dst); + let std = me.std.clone(); + + inner.state = Busy(spawn_blocking(move || { + let res = buf.read_from(&mut &*std); + (Operation::Read(res), buf) + })); + } + Busy(ref mut rx) => { + let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match op { + Operation::Read(Ok(_)) => { + buf.copy_to(dst); + inner.state = Idle(Some(buf)); + return Ready(Ok(())); + } + Operation::Read(Err(e)) => { + assert!(buf.is_empty()); + + inner.state = Idle(Some(buf)); + return Ready(Err(e)); + } + Operation::Write(Ok(_)) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + continue; + } + Operation::Write(Err(e)) => { + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + inner.state = Idle(Some(buf)); + } + Operation::Seek(result) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + if let Ok(pos) = result { + inner.pos = pos; + } + continue; + } + } + } + } + } + } +} + +impl AsyncSeek for File { + fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + loop { + match inner.state { + Busy(_) => panic!("must wait for poll_complete before calling start_seek"), + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + // Factor in any unread data from the buf + if !buf.is_empty() { + let n = buf.discard_read(); + + if let SeekFrom::Current(ref mut offset) = pos { + *offset += n; + } + } + + let std = me.std.clone(); + + inner.state = Busy(spawn_blocking(move || { + let res = (&*std).seek(pos); + (Operation::Seek(res), buf) + })); + return Ok(()); + } + } + } + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + let inner = self.inner.get_mut(); + + loop { + match inner.state { + Idle(_) => return Poll::Ready(Ok(inner.pos)), + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => {} + Operation::Write(Err(e)) => { + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + } + Operation::Write(_) => {} + Operation::Seek(res) => { + if let Ok(pos) = res { + inner.pos = pos; + } + return Ready(res); + } + } + } + } + } + } +} + +impl AsyncWrite for File { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + src: &[u8], + ) -> Poll<io::Result<usize>> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + if let Some(e) = inner.last_write_err.take() { + return Ready(Err(e.into())); + } + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let n = buf.copy_from(src); + let std = me.std.clone(); + + let blocking_task_join_handle = spawn_mandatory_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + + (Operation::Write(res), buf) + }) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "background task failed") + })?; + + inner.state = Busy(blocking_task_join_handle); + + return Ready(Ok(n)); + } + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => { + // We don't care about the result here. The fact + // that the cursor has advanced will be reflected in + // the next iteration of the loop + continue; + } + Operation::Write(res) => { + // If the previous write was successful, continue. + // Otherwise, error. + res?; + continue; + } + Operation::Seek(_) => { + // Ignore the seek + continue; + } + } + } + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + let inner = self.inner.get_mut(); + inner.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + self.poll_flush(cx) + } +} + +impl From<StdFile> for File { + fn from(std: StdFile) -> Self { + Self::from_std(std) + } +} + +impl fmt::Debug for File { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("tokio::fs::File") + .field("std", &self.std) + .finish() + } +} + +#[cfg(unix)] +impl std::os::unix::io::AsRawFd for File { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.std.as_raw_fd() + } +} + +#[cfg(unix)] +impl std::os::unix::io::FromRawFd for File { + unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { + StdFile::from_raw_fd(fd).into() + } +} + +#[cfg(windows)] +impl std::os::windows::io::AsRawHandle for File { + fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { + self.std.as_raw_handle() + } +} + +#[cfg(windows)] +impl std::os::windows::io::FromRawHandle for File { + unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self { + StdFile::from_raw_handle(handle).into() + } +} + +impl Inner { + async fn complete_inflight(&mut self) { + use crate::future::poll_fn; + + if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { + self.last_write_err = Some(e.kind()); + } + } + + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + if let Some(e) = self.last_write_err.take() { + return Ready(Err(e.into())); + } + + let (op, buf) = match self.state { + Idle(_) => return Ready(Ok(())), + Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, + }; + + // The buffer is not used here + self.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => Ready(Ok(())), + Operation::Write(res) => Ready(res), + Operation::Seek(_) => Ready(Ok(())), + } + } +} + +#[cfg(test)] +mod tests; |