use std::fmt;
use std::fs::File;
use std::future::Future;
#[cfg(feature = "multipart")]
use std::io::Cursor;
use std::io::{self, Read};
use std::mem;
use std::ptr;
use bytes::buf::UninitSlice;
use bytes::Bytes;
use crate::async_impl;
/// The body of a `Request`.
///
/// In most cases, this is not needed directly, as the
/// [`RequestBuilder.body`][builder] method uses `Into
`, which allows
/// passing many things (like a string or vector of bytes).
///
/// [builder]: ./struct.RequestBuilder.html#method.body
#[derive(Debug)]
pub struct Body {
kind: Kind,
}
impl Body {
/// Instantiate a `Body` from a reader.
///
/// # Note
///
/// While allowing for many types to be used, these bodies do not have
/// a way to reset to the beginning and be reused. This means that when
/// encountering a 307 or 308 status code, instead of repeating the
/// request at the new location, the `Response` will be returned with
/// the redirect status code set.
///
/// ```rust
/// # use std::fs::File;
/// # use reqwest::blocking::Body;
/// # fn run() -> Result<(), Box> {
/// let file = File::open("national_secrets.txt")?;
/// let body = Body::new(file);
/// # Ok(())
/// # }
/// ```
///
/// If you have a set of bytes, like `String` or `Vec`, using the
/// `From` implementations for `Body` will store the data in a manner
/// it can be reused.
///
/// ```rust
/// # use reqwest::blocking::Body;
/// # fn run() -> Result<(), Box> {
/// let s = "A stringy body";
/// let body = Body::from(s);
/// # Ok(())
/// # }
/// ```
pub fn new(reader: R) -> Body {
Body {
kind: Kind::Reader(Box::from(reader), None),
}
}
/// Create a `Body` from a `Read` where the size is known in advance
/// but the data should not be fully loaded into memory. This will
/// set the `Content-Length` header and stream from the `Read`.
///
/// ```rust
/// # use std::fs::File;
/// # use reqwest::blocking::Body;
/// # fn run() -> Result<(), Box> {
/// let file = File::open("a_large_file.txt")?;
/// let file_size = file.metadata()?.len();
/// let body = Body::sized(file, file_size);
/// # Ok(())
/// # }
/// ```
pub fn sized(reader: R, len: u64) -> Body {
Body {
kind: Kind::Reader(Box::from(reader), Some(len)),
}
}
/// Returns the body as a byte slice if the body is already buffered in
/// memory. For streamed requests this method returns `None`.
pub fn as_bytes(&self) -> Option<&[u8]> {
match self.kind {
Kind::Reader(_, _) => None,
Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
}
}
/// Converts streamed requests to their buffered equivalent and
/// returns a reference to the buffer. If the request is already
/// buffered, this has no effect.
///
/// Be aware that for large requests this method is expensive
/// and may cause your program to run out of memory.
pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
match self.kind {
Kind::Reader(ref mut reader, maybe_len) => {
let mut bytes = if let Some(len) = maybe_len {
Vec::with_capacity(len as usize)
} else {
Vec::new()
};
io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
self.kind = Kind::Bytes(bytes.into());
self.buffer()
}
Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
}
}
#[cfg(feature = "multipart")]
pub(crate) fn len(&self) -> Option {
match self.kind {
Kind::Reader(_, len) => len,
Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
}
}
#[cfg(feature = "multipart")]
pub(crate) fn into_reader(self) -> Reader {
match self.kind {
Kind::Reader(r, _) => Reader::Reader(r),
Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
}
}
pub(crate) fn into_async(self) -> (Option, async_impl::Body, Option) {
match self.kind {
Kind::Reader(read, len) => {
let (tx, rx) = hyper::Body::channel();
let tx = Sender {
body: (read, len),
tx,
};
(Some(tx), async_impl::Body::wrap(rx), len)
}
Kind::Bytes(chunk) => {
let len = chunk.len() as u64;
(None, async_impl::Body::reusable(chunk), Some(len))
}
}
}
pub(crate) fn try_clone(&self) -> Option {
self.kind.try_clone().map(|kind| Body { kind })
}
}
enum Kind {
Reader(Box, Option),
Bytes(Bytes),
}
impl Kind {
fn try_clone(&self) -> Option {
match self {
Kind::Reader(..) => None,
Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
}
}
}
impl From> for Body {
#[inline]
fn from(v: Vec) -> Body {
Body {
kind: Kind::Bytes(v.into()),
}
}
}
impl From for Body {
#[inline]
fn from(s: String) -> Body {
s.into_bytes().into()
}
}
impl From<&'static [u8]> for Body {
#[inline]
fn from(s: &'static [u8]) -> Body {
Body {
kind: Kind::Bytes(Bytes::from_static(s)),
}
}
}
impl From<&'static str> for Body {
#[inline]
fn from(s: &'static str) -> Body {
s.as_bytes().into()
}
}
impl From for Body {
#[inline]
fn from(f: File) -> Body {
let len = f.metadata().map(|m| m.len()).ok();
Body {
kind: Kind::Reader(Box::new(f), len),
}
}
}
impl From for Body {
#[inline]
fn from(b: Bytes) -> Body {
Body {
kind: Kind::Bytes(b),
}
}
}
impl fmt::Debug for Kind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Kind::Reader(_, ref v) => f
.debug_struct("Reader")
.field("length", &DebugLength(v))
.finish(),
Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
}
}
}
struct DebugLength<'a>(&'a Option);
impl<'a> fmt::Debug for DebugLength<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self.0 {
Some(ref len) => fmt::Debug::fmt(len, f),
None => f.write_str("Unknown"),
}
}
}
#[cfg(feature = "multipart")]
pub(crate) enum Reader {
Reader(Box),
Bytes(Cursor),
}
#[cfg(feature = "multipart")]
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result {
match *self {
Reader::Reader(ref mut rdr) => rdr.read(buf),
Reader::Bytes(ref mut rdr) => rdr.read(buf),
}
}
}
pub(crate) struct Sender {
body: (Box, Option),
tx: hyper::body::Sender,
}
async fn send_future(sender: Sender) -> Result<(), crate::Error> {
use bytes::{BufMut, BytesMut};
use std::cmp;
let con_len = sender.body.1;
let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
let mut written = 0;
let mut buf = BytesMut::with_capacity(cap as usize);
let mut body = sender.body.0;
// Put in an option so that it can be consumed on error to call abort()
let mut tx = Some(sender.tx);
loop {
if Some(written) == con_len {
// Written up to content-length, so stop.
return Ok(());
}
// The input stream is read only if the buffer is empty so
// that there is only one read in the buffer at any time.
//
// We need to know whether there is any data to send before
// we check the transmission channel (with poll_ready below)
// because somestimes the receiver disappears as soon as is
// considers the data is completely transmitted, which may
// be true.
//
// The use case is a web server that closes its
// input stream as soon as the data received is valid JSON.
// This behaviour is questionable, but it exists and the
// fact is that there is actually no remaining data to read.
if buf.is_empty() {
if buf.remaining_mut() == 0 {
buf.reserve(8192);
// zero out the reserved memory
let uninit = buf.chunk_mut();
unsafe {
ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
}
}
let bytes = unsafe { mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) };
match body.read(bytes) {
Ok(0) => {
// The buffer was empty and nothing's left to
// read. Return.
return Ok(());
}
Ok(n) => unsafe {
buf.advance_mut(n);
},
Err(e) => {
tx.take().expect("tx only taken on error").abort();
return Err(crate::error::body(e));
}
}
}
// The only way to get here is when the buffer is not empty.
// We can check the transmission channel
let buf_len = buf.len() as u64;
tx.as_mut()
.expect("tx only taken on error")
.send_data(buf.split().freeze())
.await
.map_err(crate::error::body)?;
written += buf_len;
}
}
impl Sender {
// A `Future` that may do blocking read calls.
// As a `Future`, this integrates easily with `wait::timeout`.
pub(crate) fn send(self) -> impl Future