summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc/src/rpc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/audioipc/src/rpc
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/audioipc/src/rpc')
-rw-r--r--third_party/rust/audioipc/src/rpc/client/mod.rs162
-rw-r--r--third_party/rust/audioipc/src/rpc/client/proxy.rs136
-rw-r--r--third_party/rust/audioipc/src/rpc/driver.rs171
-rw-r--r--third_party/rust/audioipc/src/rpc/mod.rs36
-rw-r--r--third_party/rust/audioipc/src/rpc/server.rs184
5 files changed, 689 insertions, 0 deletions
diff --git a/third_party/rust/audioipc/src/rpc/client/mod.rs b/third_party/rust/audioipc/src/rpc/client/mod.rs
new file mode 100644
index 0000000000..0fb8aed565
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/client/mod.rs
@@ -0,0 +1,162 @@
+// This is a derived version of simple/pipeline/client.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+// contain simple request/response messages:
+// * Remove `Error` types,
+// * Remove `bind_transport` fn & `BindTransport` type,
+// * Remove all "Lift"ing functionality.
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use crate::rpc::driver::Driver;
+use crate::rpc::Handler;
+use futures::sync::oneshot;
+use futures::{Async, Future, Poll, Sink, Stream};
+use std::collections::VecDeque;
+use std::io;
+use tokio::runtime::current_thread;
+
+mod proxy;
+
+pub use self::proxy::{ClientProxy, Response};
+
+pub fn bind_client<C>(transport: C::Transport) -> proxy::ClientProxy<C::Request, C::Response>
+where
+ C: Client,
+{
+ let (tx, rx) = proxy::channel();
+
+ let fut = {
+ let handler = ClientHandler::<C> {
+ transport,
+ requests: rx,
+ in_flight: VecDeque::with_capacity(32),
+ };
+ Driver::new(handler)
+ };
+
+ // Spawn the RPC driver into task
+ current_thread::spawn(fut.map_err(|_| ()));
+
+ tx
+}
+
+pub trait Client: 'static {
+ /// Request
+ type Request: 'static;
+
+ /// Response
+ type Response: 'static;
+
+ /// The message transport, which works with async I/O objects of type `A`
+ type Transport: 'static
+ + Stream<Item = Self::Response, Error = io::Error>
+ + Sink<SinkItem = Self::Request, SinkError = io::Error>;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ClientHandler<C>
+where
+ C: Client,
+{
+ transport: C::Transport,
+ requests: proxy::Receiver<C::Request, C::Response>,
+ in_flight: VecDeque<oneshot::Sender<C::Response>>,
+}
+
+impl<C> Handler for ClientHandler<C>
+where
+ C: Client,
+{
+ type In = C::Response;
+ type Out = C::Request;
+ type Transport = C::Transport;
+
+ fn transport(&mut self) -> &mut Self::Transport {
+ &mut self.transport
+ }
+
+ fn consume(&mut self, response: Self::In) -> io::Result<()> {
+ trace!("ClientHandler::consume");
+ if let Some(complete) = self.in_flight.pop_front() {
+ drop(complete.send(response));
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "request / response mismatch",
+ ));
+ }
+
+ Ok(())
+ }
+
+ /// Produce a message
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+ trace!("ClientHandler::produce");
+
+ // Try to get a new request
+ match self.requests.poll() {
+ Ok(Async::Ready(Some((request, complete)))) => {
+ trace!(" --> received request");
+
+ // Track complete handle
+ self.in_flight.push_back(complete);
+
+ Ok(Some(request).into())
+ }
+ Ok(Async::Ready(None)) => {
+ trace!(" --> client dropped");
+ Ok(None.into())
+ }
+ Ok(Async::NotReady) => {
+ trace!(" --> not ready");
+ Ok(Async::NotReady)
+ }
+ Err(_) => unreachable!(),
+ }
+ }
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool {
+ !self.in_flight.is_empty()
+ }
+}
+
+impl<C: Client> Drop for ClientHandler<C> {
+ fn drop(&mut self) {
+ let _ = self.transport.close();
+ self.in_flight.clear();
+ }
+}
diff --git a/third_party/rust/audioipc/src/rpc/client/proxy.rs b/third_party/rust/audioipc/src/rpc/client/proxy.rs
new file mode 100644
index 0000000000..bd44110d59
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/client/proxy.rs
@@ -0,0 +1,136 @@
+// This is a derived version of client_proxy.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client_proxy.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+// * Remove `RefCell` from `ClientProxy` since cubeb is called from
+// multiple threads. `mpsc::UnboundedSender` is thread safe.
+// * Simplify the interface to just request (`R`) and response (`Q`)
+// removing error (`E`).
+// * Remove the `Envelope` type.
+// * Renamed `pair` to `channel` to represent that an `rpc::channel`
+// is being created.
+//
+// Original License:
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Poll};
+use std::fmt;
+use std::io;
+
+/// Message used to dispatch requests to the task managing the
+/// client connection.
+pub type Request<R, Q> = (R, oneshot::Sender<Q>);
+
+/// Receive requests submitted to the client
+pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
+
+/// Response future returned from a client
+pub struct Response<Q> {
+ inner: oneshot::Receiver<Q>,
+}
+
+pub struct ClientProxy<R, Q> {
+ tx: mpsc::UnboundedSender<Request<R, Q>>,
+}
+
+impl<R, Q> Clone for ClientProxy<R, Q> {
+ fn clone(&self) -> Self {
+ ClientProxy {
+ tx: self.tx.clone(),
+ }
+ }
+}
+
+pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
+ // Create a channel to send the requests to client-side of rpc.
+ let (tx, rx) = mpsc::unbounded();
+
+ // Wrap the `tx` part in ClientProxy so the rpc call interface
+ // can be implemented.
+ let client = ClientProxy { tx };
+
+ (client, rx)
+}
+
+impl<R, Q> ClientProxy<R, Q> {
+ pub fn call(&self, request: R) -> Response<Q> {
+ // The response to returned from the rpc client task over a
+ // oneshot channel.
+ let (tx, rx) = oneshot::channel();
+
+ // If send returns an Err, its because the other side has been dropped.
+ // By ignoring it, we are just dropping the `tx`, which will mean the
+ // rx will return Canceled when polled. In turn, that is translated
+ // into a BrokenPipe, which conveys the proper error.
+ let _ = self.tx.unbounded_send((request, tx));
+
+ Response { inner: rx }
+ }
+}
+
+impl<R, Q> fmt::Debug for ClientProxy<R, Q>
+where
+ R: fmt::Debug,
+ Q: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "ClientProxy {{ ... }}")
+ }
+}
+
+impl<Q> Future for Response<Q> {
+ type Item = Q;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Q, io::Error> {
+ match self.inner.poll() {
+ Ok(Async::Ready(res)) => Ok(Async::Ready(res)),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ // Convert oneshot::Canceled into io::Error
+ Err(_) => {
+ let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
+ Err(e)
+ }
+ }
+ }
+}
+
+impl<Q> fmt::Debug for Response<Q>
+where
+ Q: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Response {{ ... }}")
+ }
+}
diff --git a/third_party/rust/audioipc/src/rpc/driver.rs b/third_party/rust/audioipc/src/rpc/driver.rs
new file mode 100644
index 0000000000..0890fd138e
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/driver.rs
@@ -0,0 +1,171 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::rpc::Handler;
+use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
+use std::fmt;
+use std::io;
+
+pub struct Driver<T>
+where
+ T: Handler,
+{
+ // Glue
+ handler: T,
+
+ // True as long as the connection has more request frames to read.
+ run: bool,
+
+ // True when the transport is fully flushed
+ is_flushed: bool,
+}
+
+impl<T> Driver<T>
+where
+ T: Handler,
+{
+ /// Create a new rpc driver with the given service and transport.
+ pub fn new(handler: T) -> Driver<T> {
+ Driver {
+ handler,
+ run: true,
+ is_flushed: true,
+ }
+ }
+
+ /// Returns true if the driver has nothing left to do
+ fn is_done(&self) -> bool {
+ !self.run && self.is_flushed && !self.has_in_flight()
+ }
+
+ /// Process incoming messages off the transport.
+ fn receive_incoming(&mut self) -> io::Result<()> {
+ while self.run {
+ if let Async::Ready(req) = self.handler.transport().poll()? {
+ self.process_incoming(req);
+ } else {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ /// Process an incoming message
+ fn process_incoming(&mut self, req: Option<T::In>) {
+ trace!("process_incoming");
+ // At this point, the service & transport are ready to process the
+ // request, no matter what it is.
+ match req {
+ Some(message) => {
+ trace!("received message");
+
+ if let Err(e) = self.handler.consume(message) {
+ // TODO: Should handler be infalliable?
+ panic!("unimplemented error handling: {:?}", e);
+ }
+ }
+ None => {
+ trace!("received None");
+ // At this point, we just return. This works
+ // because poll with be called again and go
+ // through the receive-cycle again.
+ self.run = false;
+ }
+ }
+ }
+
+ /// Send outgoing messages to the transport.
+ fn send_outgoing(&mut self) -> io::Result<()> {
+ trace!("send_responses");
+ loop {
+ match self.handler.produce()? {
+ Async::Ready(Some(message)) => {
+ trace!(" --> got message");
+ self.process_outgoing(message)?;
+ }
+ Async::Ready(None) => {
+ trace!(" --> got None");
+ // The service is done with the connection.
+ self.run = false;
+ break;
+ }
+ // Nothing to dispatch
+ Async::NotReady => break,
+ }
+ }
+
+ Ok(())
+ }
+
+ fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> {
+ trace!("process_outgoing");
+ assert_send(&mut self.handler.transport(), message)?;
+
+ Ok(())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.is_flushed = self.handler.transport().poll_complete()?.is_ready();
+
+ // TODO:
+ Ok(())
+ }
+
+ fn has_in_flight(&self) -> bool {
+ self.handler.has_in_flight()
+ }
+}
+
+impl<T> Future for Driver<T>
+where
+ T: Handler,
+{
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), Self::Error> {
+ trace!("rpc::Driver::tick");
+
+ // First read off data from the socket
+ self.receive_incoming()?;
+
+ // Handle completed responses
+ self.send_outgoing()?;
+
+ // Try flushing buffered writes
+ self.flush()?;
+
+ if self.is_done() {
+ trace!(" --> is done.");
+ return Ok(().into());
+ }
+
+ // Tick again later
+ Ok(Async::NotReady)
+ }
+}
+
+fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> {
+ match s.start_send(item)? {
+ AsyncSink::Ready => Ok(()),
+ AsyncSink::NotReady(_) => panic!(
+ "sink reported itself as ready after `poll_ready` but was \
+ then unable to accept a message"
+ ),
+ }
+}
+
+impl<T> fmt::Debug for Driver<T>
+where
+ T: Handler + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("rpc::Handler")
+ .field("handler", &self.handler)
+ .field("run", &self.run)
+ .field("is_flushed", &self.is_flushed)
+ .finish()
+ }
+}
diff --git a/third_party/rust/audioipc/src/rpc/mod.rs b/third_party/rust/audioipc/src/rpc/mod.rs
new file mode 100644
index 0000000000..8d54fc6e00
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/mod.rs
@@ -0,0 +1,36 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use futures::{Poll, Sink, Stream};
+use std::io;
+
+mod client;
+mod driver;
+mod server;
+
+pub use self::client::{bind_client, Client, ClientProxy, Response};
+pub use self::server::{bind_server, Server};
+
+pub trait Handler {
+ /// Message type read from transport
+ type In;
+ /// Message type written to transport
+ type Out;
+ type Transport: 'static
+ + Stream<Item = Self::In, Error = io::Error>
+ + Sink<SinkItem = Self::Out, SinkError = io::Error>;
+
+ /// Mutable reference to the transport
+ fn transport(&mut self) -> &mut Self::Transport;
+
+ /// Consume a request
+ fn consume(&mut self, message: Self::In) -> io::Result<()>;
+
+ /// Produce a response
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error>;
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool;
+}
diff --git a/third_party/rust/audioipc/src/rpc/server.rs b/third_party/rust/audioipc/src/rpc/server.rs
new file mode 100644
index 0000000000..1cb037bd8a
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/server.rs
@@ -0,0 +1,184 @@
+// This is a derived version of simple/pipeline/server.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of server.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+// contain simple request/response messages:
+// * Remove `Error` types,
+// * Remove `bind_transport` fn & `BindTransport` type,
+// * Remove all "Lift"ing functionality.
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use crate::rpc::driver::Driver;
+use crate::rpc::Handler;
+use futures::{Async, Future, Poll, Sink, Stream};
+use std::collections::VecDeque;
+use std::io;
+use tokio::runtime::current_thread;
+
+/// Bind an async I/O object `io` to the `server`.
+pub fn bind_server<S>(transport: S::Transport, server: S)
+where
+ S: Server,
+{
+ let fut = {
+ let handler = ServerHandler {
+ server,
+ transport,
+ in_flight: VecDeque::with_capacity(32),
+ };
+ Driver::new(handler)
+ };
+
+ // Spawn the RPC driver into task
+ current_thread::spawn(fut.map_err(|_| ()))
+}
+
+pub trait Server: 'static {
+ /// Request
+ type Request: 'static;
+
+ /// Response
+ type Response: 'static;
+
+ /// Future
+ type Future: Future<Item = Self::Response, Error = ()>;
+
+ /// The message transport, which works with async I/O objects of
+ /// type `A`.
+ type Transport: 'static
+ + Stream<Item = Self::Request, Error = io::Error>
+ + Sink<SinkItem = Self::Response, SinkError = io::Error>;
+
+ /// Process the request and return the response asynchronously.
+ fn process(&mut self, req: Self::Request) -> Self::Future;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ServerHandler<S>
+where
+ S: Server,
+{
+ // The service handling the connection
+ server: S,
+ // The transport responsible for sending/receving messages over the wire
+ transport: S::Transport,
+ // FIFO of "in flight" responses to requests.
+ in_flight: VecDeque<InFlight<S::Future>>,
+}
+
+impl<S> Handler for ServerHandler<S>
+where
+ S: Server,
+{
+ type In = S::Request;
+ type Out = S::Response;
+ type Transport = S::Transport;
+
+ /// Mutable reference to the transport
+ fn transport(&mut self) -> &mut Self::Transport {
+ &mut self.transport
+ }
+
+ /// Consume a message
+ fn consume(&mut self, request: Self::In) -> io::Result<()> {
+ trace!("ServerHandler::consume");
+ let response = self.server.process(request);
+ self.in_flight.push_back(InFlight::Active(response));
+
+ // TODO: Should the error be handled differently?
+ Ok(())
+ }
+
+ /// Produce a message
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+ trace!("ServerHandler::produce");
+
+ // Make progress on pending responses
+ for pending in &mut self.in_flight {
+ pending.poll();
+ }
+
+ // Is the head of the queue ready?
+ match self.in_flight.front() {
+ Some(&InFlight::Done(_)) => {}
+ _ => {
+ trace!(" --> not ready");
+ return Ok(Async::NotReady);
+ }
+ }
+
+ // Return the ready response
+ match self.in_flight.pop_front() {
+ Some(InFlight::Done(res)) => {
+ trace!(" --> received response");
+ Ok(Async::Ready(Some(res)))
+ }
+ _ => panic!(),
+ }
+ }
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool {
+ !self.in_flight.is_empty()
+ }
+}
+
+impl<S: Server> Drop for ServerHandler<S> {
+ fn drop(&mut self) {
+ let _ = self.transport.close();
+ self.in_flight.clear();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+enum InFlight<F: Future<Error = ()>> {
+ Active(F),
+ Done(F::Item),
+}
+
+impl<F: Future<Error = ()>> InFlight<F> {
+ fn poll(&mut self) {
+ let res = match *self {
+ InFlight::Active(ref mut f) => match f.poll() {
+ Ok(Async::Ready(e)) => e,
+ Err(_) => unreachable!(),
+ Ok(Async::NotReady) => return,
+ },
+ _ => return,
+ };
+ *self = InFlight::Done(res);
+ }
+}