// Copyright © 2021 Mozilla Foundation // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details use crossbeam_queue::ArrayQueue; use mio::Token; use std::cell::UnsafeCell; use std::collections::VecDeque; use std::io::{self, Error, ErrorKind, Result}; use std::marker::PhantomPinned; use std::mem::ManuallyDrop; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use crate::ipccore::EventLoopHandle; // This provides a safe-ish method for a thread to allocate // stack storage space for a result, then pass a (wrapped) // pointer to that location to another thread via // a CompletionWriter to eventually store a result into. struct Completion { item: UnsafeCell>, writer: AtomicBool, _pin: PhantomPinned, // disable rustc's no-alias } impl Completion { fn new() -> Self { Completion { item: UnsafeCell::new(None), writer: AtomicBool::new(false), _pin: PhantomPinned, } } // Wait until the writer completes, then return the result. // This is intended to be a single-use function, once the writer // has completed any further attempts to wait will return None. fn wait(&self) -> Option { // Wait for the writer to complete or be dropped. while self.writer.load(Ordering::Acquire) { std::thread::park(); } unsafe { (*self.item.get()).take() } } // Create a writer for the other thread to store the // expected result into. fn writer(&self) -> CompletionWriter { assert!(!self.writer.load(Ordering::Relaxed)); self.writer.store(true, Ordering::Release); CompletionWriter { ptr: self as *const _ as *mut _, waiter: std::thread::current(), } } } impl Drop for Completion { fn drop(&mut self) { // Wait for the outstanding writer to complete before // dropping, since the CompletionWriter references // memory owned by this object. while self.writer.load(Ordering::Acquire) { std::thread::park(); } } } struct CompletionWriter { ptr: *mut Completion, // Points to a Completion on another thread's stack waiter: std::thread::Thread, // Identifies thread waiting for completion } impl CompletionWriter { fn set(self, value: T) { // Store the result into the Completion's memory. // Since `set` consumes `self`, rely on `Drop` to // mark the writer as done and wake the Completion's // thread. unsafe { assert!((*self.ptr).writer.load(Ordering::Relaxed)); *(*self.ptr).item.get() = Some(value); } } } impl Drop for CompletionWriter { fn drop(&mut self) { // Mark writer as complete - if `set` was not called, // the waiter will receive `None`. unsafe { (*self.ptr).writer.store(false, Ordering::Release); } // Wake the Completion's thread. self.waiter.unpark(); } } // Safety: CompletionWriter holds a pointer to a Completion // residing on another thread's stack. The Completion always // waits for an outstanding writer if present, and CompletionWriter // releases the waiter and wakes the Completion's thread on drop, // so this pointer will always be live for the duration of a // CompletionWriter. unsafe impl Send for CompletionWriter {} // RPC message handler. Implemented by ClientHandler (for Client) // and ServerHandler (for Server). pub(crate) trait Handler { type In; type Out; // Consume a request fn consume(&mut self, request: Self::In) -> Result<()>; // Produce a response fn produce(&mut self) -> Result>; } // Client RPC definition. This supplies the expected message // request and response types. pub trait Client { type ServerMessage; type ClientMessage; } // Server RPC definition. This supplies the expected message // request and response types. `process` is passed inbound RPC // requests by the ServerHandler to be responded to by the server. pub trait Server { type ServerMessage; type ClientMessage; fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage; } // RPC Client Proxy implementation. type ProxyRequest = (Request, CompletionWriter); // RPC Proxy that may be `clone`d for use by multiple owners/threads. // A Proxy `call` arranges for the supplied request to be transmitted // to the associated Server via RPC and blocks awaiting the response // via the associated `Completion`. // A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler // encounters an internal error, `requests` will fail to upgrade, allowing // the proxy to report an error. #[derive(Debug)] pub struct Proxy { handle: Option<(EventLoopHandle, Token)>, requests: ManuallyDrop>>, } impl Proxy { fn new(requests: RequestQueueSender>) -> Self { Self { handle: None, requests: ManuallyDrop::new(requests), } } pub fn call(&self, request: Request) -> Result { let response = Completion::new(); self.requests.push((request, response.writer()))?; self.wake_connection(); match response.wait() { Some(resp) => Ok(resp), None => Err(Error::new(ErrorKind::Other, "proxy recv error")), } } pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) { self.handle = Some((handle, token)); } fn wake_connection(&self) { let (handle, token) = self .handle .as_ref() .expect("proxy not connected to event loop"); handle.wake_connection(*token); } } impl Clone for Proxy { fn clone(&self) -> Self { let mut clone = Self::new((*self.requests).clone()); let (handle, token) = self .handle .as_ref() .expect("proxy not connected to event loop"); clone.connect_event_loop(handle.clone(), *token); clone } } impl Drop for Proxy { fn drop(&mut self) { trace!("Proxy drop, waking EventLoop"); // Must drop `requests` before waking the connection, otherwise // the wake may be processed before the (last) weak reference is // dropped. let last_proxy = self.requests.live_proxies(); unsafe { ManuallyDrop::drop(&mut self.requests); } if last_proxy == 1 && self.handle.is_some() { self.wake_connection() } } } const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client. // Client-specific Handler implementation. // The IPC EventLoop Driver calls this to execute client-specific // RPC handling. Serialized messages sent via a Proxy are queued // for transmission when `produce` is called. // Deserialized messages are passed via `consume` to // trigger response completion by sending the response via a channel // connected to a ProxyResponse. pub(crate) struct ClientHandler { in_flight: VecDeque>, requests: Arc>>, } impl ClientHandler { fn new( requests: Arc>>, ) -> ClientHandler { ClientHandler:: { in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES), requests, } } } impl Handler for ClientHandler { type In = C::ClientMessage; type Out = C::ServerMessage; fn consume(&mut self, response: Self::In) -> Result<()> { trace!("ClientHandler::consume"); if let Some(response_writer) = self.in_flight.pop_front() { response_writer.set(response); } else { return Err(Error::new(ErrorKind::Other, "request/response mismatch")); } Ok(()) } fn produce(&mut self) -> Result> { trace!("ClientHandler::produce"); // If the weak count is zero, no proxies are attached and // no further proxies can be attached since every proxy // after the initial one is cloned from an existing instance. self.requests.check_live_proxies()?; // Try to get a new message match self.requests.pop() { Some((request, response_writer)) => { trace!(" --> received request"); self.in_flight.push_back(response_writer); Ok(Some(request)) } None => { trace!(" --> no request"); Ok(None) } } } } #[derive(Debug)] pub(crate) struct RequestQueue { queue: ArrayQueue, } impl RequestQueue { pub(crate) fn new(size: usize) -> Self { RequestQueue { queue: ArrayQueue::new(size), } } pub(crate) fn pop(&self) -> Option { self.queue.pop() } pub(crate) fn new_sender(self: &Arc) -> RequestQueueSender { RequestQueueSender { inner: Arc::downgrade(self), } } pub(crate) fn check_live_proxies(self: &Arc) -> Result<()> { if Arc::weak_count(self) == 0 { return Err(io::ErrorKind::ConnectionAborted.into()); } Ok(()) } } pub(crate) struct RequestQueueSender { inner: Weak>, } impl RequestQueueSender { pub(crate) fn push(&self, request: T) -> Result<()> { if let Some(consumer) = self.inner.upgrade() { if consumer.queue.push(request).is_err() { debug!("Proxy[{:p}]: call failed - CH::requests full", self); return Err(io::ErrorKind::ConnectionAborted.into()); } return Ok(()); } debug!("Proxy[{:p}]: call failed - CH::requests dropped", self); Err(Error::new(ErrorKind::Other, "proxy send error")) } pub(crate) fn live_proxies(&self) -> usize { Weak::weak_count(&self.inner) } } impl Clone for RequestQueueSender { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl std::fmt::Debug for RequestQueueSender { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RequestQueueProducer") .field("inner", &self.inner.as_ptr()) .finish() } } #[allow(clippy::type_complexity)] pub(crate) fn make_client( ) -> Result<(ClientHandler, Proxy)> { let requests = Arc::new(RequestQueue::new(RPC_CLIENT_INITIAL_PROXIES)); let proxy_req = requests.new_sender(); let handler = ClientHandler::new(requests); Ok((handler, Proxy::new(proxy_req))) } // Server-specific Handler implementation. // The IPC EventLoop Driver calls this to execute server-specific // RPC handling. Deserialized messages are passed via `consume` to the // associated `server` for processing. Server responses are then queued // for RPC to the associated client when `produce` is called. pub(crate) struct ServerHandler { server: S, in_flight: VecDeque, } impl Handler for ServerHandler { type In = S::ServerMessage; type Out = S::ClientMessage; fn consume(&mut self, message: Self::In) -> Result<()> { trace!("ServerHandler::consume"); let response = self.server.process(message); self.in_flight.push_back(response); Ok(()) } fn produce(&mut self) -> Result> { trace!("ServerHandler::produce"); // Return the ready response match self.in_flight.pop_front() { Some(res) => { trace!(" --> received response"); Ok(Some(res)) } None => { trace!(" --> no response ready"); Ok(None) } } } } const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server. pub(crate) fn make_server(server: S) -> ServerHandler { ServerHandler:: { server, in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS), } } #[cfg(test)] mod test { use super::*; #[test] fn basic() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); assert!(queue.pop().is_none()); producer.push(1).unwrap(); assert!(queue.pop().is_some()); assert!(queue.pop().is_none()); } #[test] fn queue_dropped() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); drop(queue); assert!(producer.push(1).is_err()); } #[test] fn queue_full() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); producer.push(1).unwrap(); assert!(producer.push(2).is_err()); } #[test] fn queue_producer_clone() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); let producer2 = producer.clone(); producer.push(1).unwrap(); assert!(producer2.push(2).is_err()); } #[test] fn queue_producer_drop() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); let producer2 = producer.clone(); drop(producer); assert!(producer2.push(2).is_ok()); } #[test] fn queue_producer_weak() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); let producer2 = producer.clone(); drop(queue); assert!(producer2.push(2).is_err()); } #[test] fn queue_producer_shutdown() { let queue = Arc::new(RequestQueue::new(1)); let producer = queue.new_sender(); let producer2 = producer.clone(); producer.push(1).unwrap(); assert!(Arc::weak_count(&queue) == 2); drop(producer); assert!(Arc::weak_count(&queue) == 1); drop(producer2); assert!(Arc::weak_count(&queue) == 0); } }