summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc/src/rpc/client
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/audioipc/src/rpc/client')
-rw-r--r--third_party/rust/audioipc/src/rpc/client/mod.rs162
-rw-r--r--third_party/rust/audioipc/src/rpc/client/proxy.rs136
2 files changed, 298 insertions, 0 deletions
diff --git a/third_party/rust/audioipc/src/rpc/client/mod.rs b/third_party/rust/audioipc/src/rpc/client/mod.rs
new file mode 100644
index 0000000000..0fb8aed565
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/client/mod.rs
@@ -0,0 +1,162 @@
+// This is a derived version of simple/pipeline/client.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Simplify the code to implement RPC for pipeline requests that
+// contain simple request/response messages:
+// * Remove `Error` types,
+// * Remove `bind_transport` fn & `BindTransport` type,
+// * Remove all "Lift"ing functionality.
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use crate::rpc::driver::Driver;
+use crate::rpc::Handler;
+use futures::sync::oneshot;
+use futures::{Async, Future, Poll, Sink, Stream};
+use std::collections::VecDeque;
+use std::io;
+use tokio::runtime::current_thread;
+
+mod proxy;
+
+pub use self::proxy::{ClientProxy, Response};
+
+pub fn bind_client<C>(transport: C::Transport) -> proxy::ClientProxy<C::Request, C::Response>
+where
+ C: Client,
+{
+ let (tx, rx) = proxy::channel();
+
+ let fut = {
+ let handler = ClientHandler::<C> {
+ transport,
+ requests: rx,
+ in_flight: VecDeque::with_capacity(32),
+ };
+ Driver::new(handler)
+ };
+
+ // Spawn the RPC driver into task
+ current_thread::spawn(fut.map_err(|_| ()));
+
+ tx
+}
+
+pub trait Client: 'static {
+ /// Request
+ type Request: 'static;
+
+ /// Response
+ type Response: 'static;
+
+ /// The message transport, which works with async I/O objects of type `A`
+ type Transport: 'static
+ + Stream<Item = Self::Response, Error = io::Error>
+ + Sink<SinkItem = Self::Request, SinkError = io::Error>;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct ClientHandler<C>
+where
+ C: Client,
+{
+ transport: C::Transport,
+ requests: proxy::Receiver<C::Request, C::Response>,
+ in_flight: VecDeque<oneshot::Sender<C::Response>>,
+}
+
+impl<C> Handler for ClientHandler<C>
+where
+ C: Client,
+{
+ type In = C::Response;
+ type Out = C::Request;
+ type Transport = C::Transport;
+
+ fn transport(&mut self) -> &mut Self::Transport {
+ &mut self.transport
+ }
+
+ fn consume(&mut self, response: Self::In) -> io::Result<()> {
+ trace!("ClientHandler::consume");
+ if let Some(complete) = self.in_flight.pop_front() {
+ drop(complete.send(response));
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "request / response mismatch",
+ ));
+ }
+
+ Ok(())
+ }
+
+ /// Produce a message
+ fn produce(&mut self) -> Poll<Option<Self::Out>, io::Error> {
+ trace!("ClientHandler::produce");
+
+ // Try to get a new request
+ match self.requests.poll() {
+ Ok(Async::Ready(Some((request, complete)))) => {
+ trace!(" --> received request");
+
+ // Track complete handle
+ self.in_flight.push_back(complete);
+
+ Ok(Some(request).into())
+ }
+ Ok(Async::Ready(None)) => {
+ trace!(" --> client dropped");
+ Ok(None.into())
+ }
+ Ok(Async::NotReady) => {
+ trace!(" --> not ready");
+ Ok(Async::NotReady)
+ }
+ Err(_) => unreachable!(),
+ }
+ }
+
+ /// RPC currently in flight
+ fn has_in_flight(&self) -> bool {
+ !self.in_flight.is_empty()
+ }
+}
+
+impl<C: Client> Drop for ClientHandler<C> {
+ fn drop(&mut self) {
+ let _ = self.transport.close();
+ self.in_flight.clear();
+ }
+}
diff --git a/third_party/rust/audioipc/src/rpc/client/proxy.rs b/third_party/rust/audioipc/src/rpc/client/proxy.rs
new file mode 100644
index 0000000000..bd44110d59
--- /dev/null
+++ b/third_party/rust/audioipc/src/rpc/client/proxy.rs
@@ -0,0 +1,136 @@
+// This is a derived version of client_proxy.rs from
+// tokio_proto crate used under MIT license.
+//
+// Original version of client_proxy.rs:
+// https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
+//
+// The following modifications were made:
+// * Remove `Service` trait since audioipc doesn't use `tokio_service`
+// crate.
+// * Remove `RefCell` from `ClientProxy` since cubeb is called from
+// multiple threads. `mpsc::UnboundedSender` is thread safe.
+// * Simplify the interface to just request (`R`) and response (`Q`)
+// removing error (`E`).
+// * Remove the `Envelope` type.
+// * Renamed `pair` to `channel` to represent that an `rpc::channel`
+// is being created.
+//
+// Original License:
+//
+// Copyright (c) 2016 Tokio contributors
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Poll};
+use std::fmt;
+use std::io;
+
+/// Message used to dispatch requests to the task managing the
+/// client connection.
+pub type Request<R, Q> = (R, oneshot::Sender<Q>);
+
+/// Receive requests submitted to the client
+pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
+
+/// Response future returned from a client
+pub struct Response<Q> {
+ inner: oneshot::Receiver<Q>,
+}
+
+pub struct ClientProxy<R, Q> {
+ tx: mpsc::UnboundedSender<Request<R, Q>>,
+}
+
+impl<R, Q> Clone for ClientProxy<R, Q> {
+ fn clone(&self) -> Self {
+ ClientProxy {
+ tx: self.tx.clone(),
+ }
+ }
+}
+
+pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
+ // Create a channel to send the requests to client-side of rpc.
+ let (tx, rx) = mpsc::unbounded();
+
+ // Wrap the `tx` part in ClientProxy so the rpc call interface
+ // can be implemented.
+ let client = ClientProxy { tx };
+
+ (client, rx)
+}
+
+impl<R, Q> ClientProxy<R, Q> {
+ pub fn call(&self, request: R) -> Response<Q> {
+ // The response to returned from the rpc client task over a
+ // oneshot channel.
+ let (tx, rx) = oneshot::channel();
+
+ // If send returns an Err, its because the other side has been dropped.
+ // By ignoring it, we are just dropping the `tx`, which will mean the
+ // rx will return Canceled when polled. In turn, that is translated
+ // into a BrokenPipe, which conveys the proper error.
+ let _ = self.tx.unbounded_send((request, tx));
+
+ Response { inner: rx }
+ }
+}
+
+impl<R, Q> fmt::Debug for ClientProxy<R, Q>
+where
+ R: fmt::Debug,
+ Q: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "ClientProxy {{ ... }}")
+ }
+}
+
+impl<Q> Future for Response<Q> {
+ type Item = Q;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Q, io::Error> {
+ match self.inner.poll() {
+ Ok(Async::Ready(res)) => Ok(Async::Ready(res)),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ // Convert oneshot::Canceled into io::Error
+ Err(_) => {
+ let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
+ Err(e)
+ }
+ }
+ }
+}
+
+impl<Q> fmt::Debug for Response<Q>
+where
+ Q: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Response {{ ... }}")
+ }
+}