diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/audioipc/src/rpc | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/audioipc/src/rpc')
-rw-r--r-- | third_party/rust/audioipc/src/rpc/client/mod.rs | 162 | ||||
-rw-r--r-- | third_party/rust/audioipc/src/rpc/client/proxy.rs | 136 | ||||
-rw-r--r-- | third_party/rust/audioipc/src/rpc/driver.rs | 171 | ||||
-rw-r--r-- | third_party/rust/audioipc/src/rpc/mod.rs | 36 | ||||
-rw-r--r-- | third_party/rust/audioipc/src/rpc/server.rs | 184 |
5 files changed, 689 insertions, 0 deletions
diff --git a/third_party/rust/audioipc/src/rpc/client/mod.rs b/third_party/rust/audioipc/src/rpc/client/mod.rs new file mode 100644 index 0000000000..0fb8aed565 --- /dev/null +++ b/third_party/rust/audioipc/src/rpc/client/mod.rs @@ -0,0 +1,162 @@ +// This is a derived version of simple/pipeline/client.rs from +// tokio_proto crate used under MIT license. +// +// Original version of client.rs: +// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3 +// +// The following modifications were made: +// * Simplify the code to implement RPC for pipeline requests that +// contain simple request/response messages: +// * Remove `Error` types, +// * Remove `bind_transport` fn & `BindTransport` type, +// * Remove all "Lift"ing functionality. +// * Remove `Service` trait since audioipc doesn't use `tokio_service` +// crate. +// +// Copyright (c) 2016 Tokio contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::rpc::driver::Driver; +use crate::rpc::Handler; +use futures::sync::oneshot; +use futures::{Async, Future, Poll, Sink, Stream}; +use std::collections::VecDeque; +use std::io; +use tokio::runtime::current_thread; + +mod proxy; + +pub use self::proxy::{ClientProxy, Response}; + +pub fn bind_client<C>(transport: C::Transport) -> proxy::ClientProxy<C::Request, C::Response> +where + C: Client, +{ + let (tx, rx) = proxy::channel(); + + let fut = { + let handler = ClientHandler::<C> { + transport, + requests: rx, + in_flight: VecDeque::with_capacity(32), + }; + Driver::new(handler) + }; + + // Spawn the RPC driver into task + current_thread::spawn(fut.map_err(|_| ())); + + tx +} + +pub trait Client: 'static { + /// Request + type Request: 'static; + + /// Response + type Response: 'static; + + /// The message transport, which works with async I/O objects of type `A` + type Transport: 'static + + Stream<Item = Self::Response, Error = io::Error> + + Sink<SinkItem = Self::Request, SinkError = io::Error>; +} + +//////////////////////////////////////////////////////////////////////////////// + +struct ClientHandler<C> +where + C: Client, +{ + transport: C::Transport, + requests: proxy::Receiver<C::Request, C::Response>, + in_flight: VecDeque<oneshot::Sender<C::Response>>, +} + +impl<C> Handler for ClientHandler<C> +where + C: Client, +{ + type In = C::Response; + type Out = C::Request; + type Transport = C::Transport; + + fn transport(&mut self) -> &mut Self::Transport { + &mut self.transport + } + + fn consume(&mut self, response: Self::In) -> io::Result<()> { + trace!("ClientHandler::consume"); + if let Some(complete) = self.in_flight.pop_front() { + drop(complete.send(response)); + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "request / response mismatch", + )); + } + + Ok(()) + } + + /// Produce a message + fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> { + trace!("ClientHandler::produce"); + + // Try to get a new request + match self.requests.poll() { + Ok(Async::Ready(Some((request, complete)))) => { + trace!(" --> received request"); + + // Track complete handle + self.in_flight.push_back(complete); + + Ok(Some(request).into()) + } + Ok(Async::Ready(None)) => { + trace!(" --> client dropped"); + Ok(None.into()) + } + Ok(Async::NotReady) => { + trace!(" --> not ready"); + Ok(Async::NotReady) + } + Err(_) => unreachable!(), + } + } + + /// RPC currently in flight + fn has_in_flight(&self) -> bool { + !self.in_flight.is_empty() + } +} + +impl<C: Client> Drop for ClientHandler<C> { + fn drop(&mut self) { + let _ = self.transport.close(); + self.in_flight.clear(); + } +} diff --git a/third_party/rust/audioipc/src/rpc/client/proxy.rs b/third_party/rust/audioipc/src/rpc/client/proxy.rs new file mode 100644 index 0000000000..bd44110d59 --- /dev/null +++ b/third_party/rust/audioipc/src/rpc/client/proxy.rs @@ -0,0 +1,136 @@ +// This is a derived version of client_proxy.rs from +// tokio_proto crate used under MIT license. +// +// Original version of client_proxy.rs: +// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3 +// +// The following modifications were made: +// * Remove `Service` trait since audioipc doesn't use `tokio_service` +// crate. +// * Remove `RefCell` from `ClientProxy` since cubeb is called from +// multiple threads. `mpsc::UnboundedSender` is thread safe. +// * Simplify the interface to just request (`R`) and response (`Q`) +// removing error (`E`). +// * Remove the `Envelope` type. +// * Renamed `pair` to `channel` to represent that an `rpc::channel` +// is being created. +// +// Original License: +// +// Copyright (c) 2016 Tokio contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Poll}; +use std::fmt; +use std::io; + +/// Message used to dispatch requests to the task managing the +/// client connection. +pub type Request<R, Q> = (R, oneshot::Sender<Q>); + +/// Receive requests submitted to the client +pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>; + +/// Response future returned from a client +pub struct Response<Q> { + inner: oneshot::Receiver<Q>, +} + +pub struct ClientProxy<R, Q> { + tx: mpsc::UnboundedSender<Request<R, Q>>, +} + +impl<R, Q> Clone for ClientProxy<R, Q> { + fn clone(&self) -> Self { + ClientProxy { + tx: self.tx.clone(), + } + } +} + +pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) { + // Create a channel to send the requests to client-side of rpc. + let (tx, rx) = mpsc::unbounded(); + + // Wrap the `tx` part in ClientProxy so the rpc call interface + // can be implemented. + let client = ClientProxy { tx }; + + (client, rx) +} + +impl<R, Q> ClientProxy<R, Q> { + pub fn call(&self, request: R) -> Response<Q> { + // The response to returned from the rpc client task over a + // oneshot channel. + let (tx, rx) = oneshot::channel(); + + // If send returns an Err, its because the other side has been dropped. + // By ignoring it, we are just dropping the `tx`, which will mean the + // rx will return Canceled when polled. In turn, that is translated + // into a BrokenPipe, which conveys the proper error. + let _ = self.tx.unbounded_send((request, tx)); + + Response { inner: rx } + } +} + +impl<R, Q> fmt::Debug for ClientProxy<R, Q> +where + R: fmt::Debug, + Q: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ClientProxy {{ ... }}") + } +} + +impl<Q> Future for Response<Q> { + type Item = Q; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Q, io::Error> { + match self.inner.poll() { + Ok(Async::Ready(res)) => Ok(Async::Ready(res)), + Ok(Async::NotReady) => Ok(Async::NotReady), + // Convert oneshot::Canceled into io::Error + Err(_) => { + let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"); + Err(e) + } + } + } +} + +impl<Q> fmt::Debug for Response<Q> +where + Q: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Response {{ ... }}") + } +} diff --git a/third_party/rust/audioipc/src/rpc/driver.rs b/third_party/rust/audioipc/src/rpc/driver.rs new file mode 100644 index 0000000000..0890fd138e --- /dev/null +++ b/third_party/rust/audioipc/src/rpc/driver.rs @@ -0,0 +1,171 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::rpc::Handler; +use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; +use std::fmt; +use std::io; + +pub struct Driver<T> +where + T: Handler, +{ + // Glue + handler: T, + + // True as long as the connection has more request frames to read. + run: bool, + + // True when the transport is fully flushed + is_flushed: bool, +} + +impl<T> Driver<T> +where + T: Handler, +{ + /// Create a new rpc driver with the given service and transport. + pub fn new(handler: T) -> Driver<T> { + Driver { + handler, + run: true, + is_flushed: true, + } + } + + /// Returns true if the driver has nothing left to do + fn is_done(&self) -> bool { + !self.run && self.is_flushed && !self.has_in_flight() + } + + /// Process incoming messages off the transport. + fn receive_incoming(&mut self) -> io::Result<()> { + while self.run { + if let Async::Ready(req) = self.handler.transport().poll()? { + self.process_incoming(req); + } else { + break; + } + } + Ok(()) + } + + /// Process an incoming message + fn process_incoming(&mut self, req: Option<T::In>) { + trace!("process_incoming"); + // At this point, the service & transport are ready to process the + // request, no matter what it is. + match req { + Some(message) => { + trace!("received message"); + + if let Err(e) = self.handler.consume(message) { + // TODO: Should handler be infalliable? + panic!("unimplemented error handling: {:?}", e); + } + } + None => { + trace!("received None"); + // At this point, we just return. This works + // because poll with be called again and go + // through the receive-cycle again. + self.run = false; + } + } + } + + /// Send outgoing messages to the transport. + fn send_outgoing(&mut self) -> io::Result<()> { + trace!("send_responses"); + loop { + match self.handler.produce()? { + Async::Ready(Some(message)) => { + trace!(" --> got message"); + self.process_outgoing(message)?; + } + Async::Ready(None) => { + trace!(" --> got None"); + // The service is done with the connection. + self.run = false; + break; + } + // Nothing to dispatch + Async::NotReady => break, + } + } + + Ok(()) + } + + fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> { + trace!("process_outgoing"); + assert_send(&mut self.handler.transport(), message)?; + + Ok(()) + } + + fn flush(&mut self) -> io::Result<()> { + self.is_flushed = self.handler.transport().poll_complete()?.is_ready(); + + // TODO: + Ok(()) + } + + fn has_in_flight(&self) -> bool { + self.handler.has_in_flight() + } +} + +impl<T> Future for Driver<T> +where + T: Handler, +{ + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), Self::Error> { + trace!("rpc::Driver::tick"); + + // First read off data from the socket + self.receive_incoming()?; + + // Handle completed responses + self.send_outgoing()?; + + // Try flushing buffered writes + self.flush()?; + + if self.is_done() { + trace!(" --> is done."); + return Ok(().into()); + } + + // Tick again later + Ok(Async::NotReady) + } +} + +fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> { + match s.start_send(item)? { + AsyncSink::Ready => Ok(()), + AsyncSink::NotReady(_) => panic!( + "sink reported itself as ready after `poll_ready` but was \ + then unable to accept a message" + ), + } +} + +impl<T> fmt::Debug for Driver<T> +where + T: Handler + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("rpc::Handler") + .field("handler", &self.handler) + .field("run", &self.run) + .field("is_flushed", &self.is_flushed) + .finish() + } +} diff --git a/third_party/rust/audioipc/src/rpc/mod.rs b/third_party/rust/audioipc/src/rpc/mod.rs new file mode 100644 index 0000000000..8d54fc6e00 --- /dev/null +++ b/third_party/rust/audioipc/src/rpc/mod.rs @@ -0,0 +1,36 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use futures::{Poll, Sink, Stream}; +use std::io; + +mod client; +mod driver; +mod server; + +pub use self::client::{bind_client, Client, ClientProxy, Response}; +pub use self::server::{bind_server, Server}; + +pub trait Handler { + /// Message type read from transport + type In; + /// Message type written to transport + type Out; + type Transport: 'static + + Stream<Item = Self::In, Error = io::Error> + + Sink<SinkItem = Self::Out, SinkError = io::Error>; + + /// Mutable reference to the transport + fn transport(&mut self) -> &mut Self::Transport; + + /// Consume a request + fn consume(&mut self, message: Self::In) -> io::Result<()>; + + /// Produce a response + fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error>; + + /// RPC currently in flight + fn has_in_flight(&self) -> bool; +} diff --git a/third_party/rust/audioipc/src/rpc/server.rs b/third_party/rust/audioipc/src/rpc/server.rs new file mode 100644 index 0000000000..1cb037bd8a --- /dev/null +++ b/third_party/rust/audioipc/src/rpc/server.rs @@ -0,0 +1,184 @@ +// This is a derived version of simple/pipeline/server.rs from +// tokio_proto crate used under MIT license. +// +// Original version of server.rs: +// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3 +// +// The following modifications were made: +// * Simplify the code to implement RPC for pipeline requests that +// contain simple request/response messages: +// * Remove `Error` types, +// * Remove `bind_transport` fn & `BindTransport` type, +// * Remove all "Lift"ing functionality. +// * Remove `Service` trait since audioipc doesn't use `tokio_service` +// crate. +// +// Copyright (c) 2016 Tokio contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::rpc::driver::Driver; +use crate::rpc::Handler; +use futures::{Async, Future, Poll, Sink, Stream}; +use std::collections::VecDeque; +use std::io; +use tokio::runtime::current_thread; + +/// Bind an async I/O object `io` to the `server`. +pub fn bind_server<S>(transport: S::Transport, server: S) +where + S: Server, +{ + let fut = { + let handler = ServerHandler { + server, + transport, + in_flight: VecDeque::with_capacity(32), + }; + Driver::new(handler) + }; + + // Spawn the RPC driver into task + current_thread::spawn(fut.map_err(|_| ())) +} + +pub trait Server: 'static { + /// Request + type Request: 'static; + + /// Response + type Response: 'static; + + /// Future + type Future: Future<Item = Self::Response, Error = ()>; + + /// The message transport, which works with async I/O objects of + /// type `A`. + type Transport: 'static + + Stream<Item = Self::Request, Error = io::Error> + + Sink<SinkItem = Self::Response, SinkError = io::Error>; + + /// Process the request and return the response asynchronously. + fn process(&mut self, req: Self::Request) -> Self::Future; +} + +//////////////////////////////////////////////////////////////////////////////// + +struct ServerHandler<S> +where + S: Server, +{ + // The service handling the connection + server: S, + // The transport responsible for sending/receving messages over the wire + transport: S::Transport, + // FIFO of "in flight" responses to requests. + in_flight: VecDeque<InFlight<S::Future>>, +} + +impl<S> Handler for ServerHandler<S> +where + S: Server, +{ + type In = S::Request; + type Out = S::Response; + type Transport = S::Transport; + + /// Mutable reference to the transport + fn transport(&mut self) -> &mut Self::Transport { + &mut self.transport + } + + /// Consume a message + fn consume(&mut self, request: Self::In) -> io::Result<()> { + trace!("ServerHandler::consume"); + let response = self.server.process(request); + self.in_flight.push_back(InFlight::Active(response)); + + // TODO: Should the error be handled differently? + Ok(()) + } + + /// Produce a message + fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> { + trace!("ServerHandler::produce"); + + // Make progress on pending responses + for pending in &mut self.in_flight { + pending.poll(); + } + + // Is the head of the queue ready? + match self.in_flight.front() { + Some(&InFlight::Done(_)) => {} + _ => { + trace!(" --> not ready"); + return Ok(Async::NotReady); + } + } + + // Return the ready response + match self.in_flight.pop_front() { + Some(InFlight::Done(res)) => { + trace!(" --> received response"); + Ok(Async::Ready(Some(res))) + } + _ => panic!(), + } + } + + /// RPC currently in flight + fn has_in_flight(&self) -> bool { + !self.in_flight.is_empty() + } +} + +impl<S: Server> Drop for ServerHandler<S> { + fn drop(&mut self) { + let _ = self.transport.close(); + self.in_flight.clear(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +enum InFlight<F: Future<Error = ()>> { + Active(F), + Done(F::Item), +} + +impl<F: Future<Error = ()>> InFlight<F> { + fn poll(&mut self) { + let res = match *self { + InFlight::Active(ref mut f) => match f.poll() { + Ok(Async::Ready(e)) => e, + Err(_) => unreachable!(), + Ok(Async::NotReady) => return, + }, + _ => return, + }; + *self = InFlight::Done(res); + } +} |