summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/ffi/body.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/ffi/body.rs')
-rw-r--r--third_party/rust/hyper/src/ffi/body.rs229
1 files changed, 229 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/ffi/body.rs b/third_party/rust/hyper/src/ffi/body.rs
new file mode 100644
index 0000000000..39ba5beffb
--- /dev/null
+++ b/third_party/rust/hyper/src/ffi/body.rs
@@ -0,0 +1,229 @@
+use std::ffi::c_void;
+use std::mem::ManuallyDrop;
+use std::ptr;
+use std::task::{Context, Poll};
+
+use http::HeaderMap;
+use libc::{c_int, size_t};
+
+use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType};
+use super::{UserDataPointer, HYPER_ITER_CONTINUE};
+use crate::body::{Body, Bytes, HttpBody as _};
+
+/// A streaming HTTP body.
+pub struct hyper_body(pub(super) Body);
+
+/// A buffer of bytes that is sent or received on a `hyper_body`.
+pub struct hyper_buf(pub(crate) Bytes);
+
+pub(crate) struct UserBody {
+ data_func: hyper_body_data_callback,
+ userdata: *mut c_void,
+}
+
+// ===== Body =====
+
+type hyper_body_foreach_callback = extern "C" fn(*mut c_void, *const hyper_buf) -> c_int;
+
+type hyper_body_data_callback =
+ extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut *mut hyper_buf) -> c_int;
+
+ffi_fn! {
+ /// Create a new "empty" body.
+ ///
+ /// If not configured, this body acts as an empty payload.
+ fn hyper_body_new() -> *mut hyper_body {
+ Box::into_raw(Box::new(hyper_body(Body::empty())))
+ } ?= ptr::null_mut()
+}
+
+ffi_fn! {
+ /// Free a `hyper_body *`.
+ fn hyper_body_free(body: *mut hyper_body) {
+ drop(non_null!(Box::from_raw(body) ?= ()));
+ }
+}
+
+ffi_fn! {
+ /// Return a task that will poll the body for the next buffer of data.
+ ///
+ /// The task value may have different types depending on the outcome:
+ ///
+ /// - `HYPER_TASK_BUF`: Success, and more data was received.
+ /// - `HYPER_TASK_ERROR`: An error retrieving the data.
+ /// - `HYPER_TASK_EMPTY`: The body has finished streaming data.
+ ///
+ /// This does not consume the `hyper_body *`, so it may be used to again.
+ /// However, it MUST NOT be used or freed until the related task completes.
+ fn hyper_body_data(body: *mut hyper_body) -> *mut hyper_task {
+ // This doesn't take ownership of the Body, so don't allow destructor
+ let mut body = ManuallyDrop::new(non_null!(Box::from_raw(body) ?= ptr::null_mut()));
+
+ Box::into_raw(hyper_task::boxed(async move {
+ body.0.data().await.map(|res| res.map(hyper_buf))
+ }))
+ } ?= ptr::null_mut()
+}
+
+ffi_fn! {
+ /// Return a task that will poll the body and execute the callback with each
+ /// body chunk that is received.
+ ///
+ /// The `hyper_buf` pointer is only a borrowed reference, it cannot live outside
+ /// the execution of the callback. You must make a copy to retain it.
+ ///
+ /// The callback should return `HYPER_ITER_CONTINUE` to continue iterating
+ /// chunks as they are received, or `HYPER_ITER_BREAK` to cancel.
+ ///
+ /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it.
+ fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task {
+ let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
+ let userdata = UserDataPointer(userdata);
+
+ Box::into_raw(hyper_task::boxed(async move {
+ while let Some(item) = body.0.data().await {
+ let chunk = item?;
+ if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) {
+ return Err(crate::Error::new_user_aborted_by_callback());
+ }
+ }
+ Ok(())
+ }))
+ } ?= ptr::null_mut()
+}
+
+ffi_fn! {
+ /// Set userdata on this body, which will be passed to callback functions.
+ fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) {
+ let b = non_null!(&mut *body ?= ());
+ b.0.as_ffi_mut().userdata = userdata;
+ }
+}
+
+ffi_fn! {
+ /// Set the data callback for this body.
+ ///
+ /// The callback is called each time hyper needs to send more data for the
+ /// body. It is passed the value from `hyper_body_set_userdata`.
+ ///
+ /// If there is data available, the `hyper_buf **` argument should be set
+ /// to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should
+ /// be returned.
+ ///
+ /// Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points
+ /// to `NULL` will indicate the body has completed all data.
+ ///
+ /// If there is more data to send, but it isn't yet available, a
+ /// `hyper_waker` should be saved from the `hyper_context *` argument, and
+ /// `HYPER_POLL_PENDING` should be returned. You must wake the saved waker
+ /// to signal the task when data is available.
+ ///
+ /// If some error has occurred, you can return `HYPER_POLL_ERROR` to abort
+ /// the body.
+ fn hyper_body_set_data_func(body: *mut hyper_body, func: hyper_body_data_callback) {
+ let b = non_null!{ &mut *body ?= () };
+ b.0.as_ffi_mut().data_func = func;
+ }
+}
+
+// ===== impl UserBody =====
+
+impl UserBody {
+ pub(crate) fn new() -> UserBody {
+ UserBody {
+ data_func: data_noop,
+ userdata: std::ptr::null_mut(),
+ }
+ }
+
+ pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
+ let mut out = std::ptr::null_mut();
+ match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) {
+ super::task::HYPER_POLL_READY => {
+ if out.is_null() {
+ Poll::Ready(None)
+ } else {
+ let buf = unsafe { Box::from_raw(out) };
+ Poll::Ready(Some(Ok(buf.0)))
+ }
+ }
+ super::task::HYPER_POLL_PENDING => Poll::Pending,
+ super::task::HYPER_POLL_ERROR => {
+ Poll::Ready(Some(Err(crate::Error::new_body_write_aborted())))
+ }
+ unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!(
+ "unexpected hyper_body_data_func return code {}",
+ unexpected
+ ))))),
+ }
+ }
+
+ pub(crate) fn poll_trailers(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<crate::Result<Option<HeaderMap>>> {
+ Poll::Ready(Ok(None))
+ }
+}
+
+/// cbindgen:ignore
+extern "C" fn data_noop(
+ _userdata: *mut c_void,
+ _: *mut hyper_context<'_>,
+ _: *mut *mut hyper_buf,
+) -> c_int {
+ super::task::HYPER_POLL_READY
+}
+
+unsafe impl Send for UserBody {}
+unsafe impl Sync for UserBody {}
+
+// ===== Bytes =====
+
+ffi_fn! {
+ /// Create a new `hyper_buf *` by copying the provided bytes.
+ ///
+ /// This makes an owned copy of the bytes, so the `buf` argument can be
+ /// freed or changed afterwards.
+ ///
+ /// This returns `NULL` if allocating a new buffer fails.
+ fn hyper_buf_copy(buf: *const u8, len: size_t) -> *mut hyper_buf {
+ let slice = unsafe {
+ std::slice::from_raw_parts(buf, len)
+ };
+ Box::into_raw(Box::new(hyper_buf(Bytes::copy_from_slice(slice))))
+ } ?= ptr::null_mut()
+}
+
+ffi_fn! {
+ /// Get a pointer to the bytes in this buffer.
+ ///
+ /// This should be used in conjunction with `hyper_buf_len` to get the length
+ /// of the bytes data.
+ ///
+ /// This pointer is borrowed data, and not valid once the `hyper_buf` is
+ /// consumed/freed.
+ fn hyper_buf_bytes(buf: *const hyper_buf) -> *const u8 {
+ unsafe { (*buf).0.as_ptr() }
+ } ?= ptr::null()
+}
+
+ffi_fn! {
+ /// Get the length of the bytes this buffer contains.
+ fn hyper_buf_len(buf: *const hyper_buf) -> size_t {
+ unsafe { (*buf).0.len() }
+ }
+}
+
+ffi_fn! {
+ /// Free this buffer.
+ fn hyper_buf_free(buf: *mut hyper_buf) {
+ drop(unsafe { Box::from_raw(buf) });
+ }
+}
+
+unsafe impl AsTaskType for hyper_buf {
+ fn as_task_type(&self) -> hyper_task_return_type {
+ hyper_task_return_type::HYPER_TASK_BUF
+ }
+}