// Copyright © 2021 Mozilla Foundation // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details use std::io::{self, Result}; use std::sync::{mpsc, Arc}; use std::thread; use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker}; use slab::Slab; use crate::messages::AssociateHandleForMessage; use crate::rpccore::{ make_client, make_server, Client, Handler, Proxy, RequestQueue, RequestQueueSender, Server, }; use crate::{ codec::Codec, codec::LengthDelimitedCodec, sys::{self, RecvMsg, SendMsg}, }; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; const WAKE_TOKEN: Token = Token(!0); thread_local!(static IN_EVENTLOOP: std::cell::RefCell> = std::cell::RefCell::new(None)); fn assert_not_in_event_loop_thread() { IN_EVENTLOOP.with(|b| { assert_ne!(*b.borrow(), Some(thread::current().id())); }); } // Requests sent by an EventLoopHandle to be serviced by // the handle's associated EventLoop. enum Request { // See EventLoop::add_connection AddConnection( sys::Pipe, Box, mpsc::Sender>, ), // See EventLoop::shutdown Shutdown, // See EventLoop::wake_connection WakeConnection(Token), } // EventLoopHandle is a cloneable external reference // to a running EventLoop, allowing registration of // new client and server connections, in addition to // requesting the EventLoop shut down cleanly. #[derive(Clone, Debug)] pub struct EventLoopHandle { waker: Arc, requests: RequestQueueSender, } impl EventLoopHandle { pub fn bind_client( &self, connection: sys::Pipe, ) -> Result::ServerMessage, ::ClientMessage>> where ::ServerMessage: Serialize + Debug + AssociateHandleForMessage + Send, ::ClientMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send, { let (handler, mut proxy) = make_client::()?; let driver = Box::new(FramedDriver::new(handler)); let r = self.add_connection(connection, driver); trace!("EventLoop::bind_client {:?}", r); r.map(|token| { proxy.connect_event_loop(self.clone(), token); proxy }) } pub fn bind_server( &self, server: S, connection: sys::Pipe, ) -> Result<()> where ::ServerMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send, ::ClientMessage: Serialize + Debug + AssociateHandleForMessage + Send, { let handler = make_server::(server); let driver = Box::new(FramedDriver::new(handler)); let r = self.add_connection(connection, driver); trace!("EventLoop::bind_server {:?}", r); r.map(|_| ()) } // Register a new connection with associated driver on the EventLoop. // TODO: Since this is called from a Gecko main thread, make this non-blocking wrt. the EventLoop. fn add_connection( &self, connection: sys::Pipe, driver: Box, ) -> Result { assert_not_in_event_loop_thread(); let (tx, rx) = mpsc::channel(); self.requests .push(Request::AddConnection(connection, driver, tx)) .map_err(|_| { debug!("EventLoopHandle::add_connection send failed"); io::ErrorKind::ConnectionAborted })?; self.waker.wake()?; rx.recv().map_err(|_| { debug!("EventLoopHandle::add_connection recv failed"); io::ErrorKind::ConnectionAborted })? } // Signal EventLoop to shutdown. Causes EventLoop::poll to return Ok(false). fn shutdown(&self) -> Result<()> { self.requests.push(Request::Shutdown).map_err(|_| { debug!("EventLoopHandle::shutdown send failed"); io::ErrorKind::ConnectionAborted })?; self.waker.wake() } // Signal EventLoop to wake connection specified by `token` for processing. pub(crate) fn wake_connection(&self, token: Token) { if self.requests.push(Request::WakeConnection(token)).is_ok() { self.waker.wake().expect("wake failed"); } } } // EventLoop owns all registered connections, and is responsible for calling each connection's // `handle_event` or `handle_wake` any time a readiness or wake event associated with that connection is // produced. struct EventLoop { poll: Poll, events: Events, waker: Arc, name: String, connections: Slab, requests: Arc>, } const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow. const EVENT_LOOP_EVENTS_PER_ITERATION: usize = 256; // Number of events per poll() step, arbitrary limit. impl EventLoop { fn new(name: String) -> Result { let poll = Poll::new()?; let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); let eventloop = EventLoop { poll, events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION), waker, name, connections: Slab::with_capacity(EVENT_LOOP_INITIAL_CLIENTS), requests: Arc::new(RequestQueue::new(EVENT_LOOP_INITIAL_CLIENTS)), }; Ok(eventloop) } // Return a cloneable handle for controlling the EventLoop externally. fn handle(&mut self) -> EventLoopHandle { EventLoopHandle { waker: self.waker.clone(), requests: self.requests.new_sender(), } } // Register a connection and driver. fn add_connection( &mut self, connection: sys::Pipe, driver: Box, ) -> Result { if self.connections.len() == self.connections.capacity() { trace!("{}: connection slab full, insert will allocate", self.name); } let entry = self.connections.vacant_entry(); let token = Token(entry.key()); assert_ne!(token, WAKE_TOKEN); let connection = Connection::new(connection, token, driver, self.poll.registry())?; debug!("{}: {:?}: new connection", self.name, token); entry.insert(connection); Ok(token) } // Step EventLoop once. Call this in a loop from a dedicated thread. // Returns false if EventLoop is shutting down. // Each step may call `handle_event` on any registered connection that // has received readiness events from the poll wakeup. fn poll(&mut self) -> Result { loop { let r = self.poll.poll(&mut self.events, None); match r { Ok(()) => break, Err(ref e) if interrupted(e) => continue, Err(e) => return Err(e), } } for event in self.events.iter() { match event.token() { WAKE_TOKEN => { debug!("{}: WAKE: wake event, will process requests", self.name); } token => { debug!("{}: {:?}: connection event: {:?}", self.name, token, event); let done = if let Some(connection) = self.connections.get_mut(token.0) { match connection.handle_event(event, self.poll.registry()) { Ok(done) => { debug!("{}: connection {:?} done={}", self.name, token, done); done } Err(e) => { debug!("{}: {:?}: connection error: {:?}", self.name, token, e); true } } } else { // Spurious event, log and ignore. debug!( "{}: {:?}: token not found in slab: {:?}", self.name, token, event ); false }; if done { debug!("{}: {:?}: done, removing", self.name, token); let mut connection = self.connections.remove(token.0); if let Err(e) = connection.shutdown(self.poll.registry()) { debug!( "{}: EventLoop drop - closing connection for {:?} failed: {:?}", self.name, token, e ); } } } } } // If the waker was signalled there may be pending requests to process. while let Some(req) = self.requests.pop() { match req { Request::AddConnection(pipe, driver, tx) => { debug!("{}: EventLoop: handling add_connection", self.name); let r = self.add_connection(pipe, driver); tx.send(r).expect("EventLoop::add_connection"); } Request::Shutdown => { debug!("{}: EventLoop: handling shutdown", self.name); return Ok(false); } Request::WakeConnection(token) => { debug!( "{}: EventLoop: handling wake_connection {:?}", self.name, token ); let done = if let Some(connection) = self.connections.get_mut(token.0) { match connection.handle_wake(self.poll.registry()) { Ok(done) => done, Err(e) => { debug!("{}: {:?}: connection error: {:?}", self.name, token, e); true } } } else { // Spurious wake, log and ignore. debug!( "{}: {:?}: token not found in slab: wake_connection", self.name, token ); false }; if done { debug!("{}: {:?}: done (wake), removing", self.name, token); let mut connection = self.connections.remove(token.0); if let Err(e) = connection.shutdown(self.poll.registry()) { debug!( "{}: EventLoop drop - closing connection for {:?} failed: {:?}", self.name, token, e ); } } } } } Ok(true) } } impl Drop for EventLoop { fn drop(&mut self) { debug!("{}: EventLoop drop", self.name); for (token, connection) in &mut self.connections { debug!( "{}: EventLoop drop - closing connection for {:?}", self.name, token ); if let Err(e) = connection.shutdown(self.poll.registry()) { debug!( "{}: EventLoop drop - closing connection for {:?} failed: {:?}", self.name, token, e ); } } debug!("{}: EventLoop drop done", self.name); } } // Connection wraps an interprocess connection (Pipe) and manages // receiving inbound and sending outbound buffers (and associated handles, if any). // The associated driver is responsible for message framing and serialization. struct Connection { io: sys::Pipe, token: Token, interest: Option, inbound: sys::ConnectionBuffer, outbound: sys::ConnectionBuffer, driver: Box, } pub(crate) const IPC_CLIENT_BUFFER_SIZE: usize = 16384; impl Connection { fn new( mut io: sys::Pipe, token: Token, driver: Box, registry: &Registry, ) -> Result { let interest = Interest::READABLE; registry.register(&mut io, token, interest)?; Ok(Connection { io, token, interest: Some(interest), inbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE), outbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE), driver, }) } fn shutdown(&mut self, registry: &Registry) -> Result<()> { trace!( "{:?}: connection shutdown interest={:?}", self.token, self.interest ); let r = self.io.shutdown(); trace!("{:?}: connection shutdown r={:?}", self.token, r); self.interest = None; registry.deregister(&mut self.io) } // Connections are always interested in READABLE. clear_readable is only // called when the connection is in the process of shutting down. fn clear_readable(&mut self, registry: &Registry) -> Result<()> { self.update_registration( registry, self.interest.and_then(|i| i.remove(Interest::READABLE)), ) } // Connections toggle WRITABLE based on the state of the `outbound` buffer. fn set_writable(&mut self, registry: &Registry) -> Result<()> { self.update_registration( registry, Some( self.interest .map_or_else(|| Interest::WRITABLE, |i| i.add(Interest::WRITABLE)), ), ) } fn clear_writable(&mut self, registry: &Registry) -> Result<()> { self.update_registration( registry, self.interest.and_then(|i| i.remove(Interest::WRITABLE)), ) } // Update connection registration with the current readiness event interests. fn update_registration( &mut self, registry: &Registry, new_interest: Option, ) -> Result<()> { // Note: Updating registration always triggers a writable event with NamedPipes, so // it's important to skip updating registration when the set of interests hasn't changed. if new_interest != self.interest { trace!( "{:?}: updating readiness registration old={:?} new={:?}", self.token, self.interest, new_interest ); self.interest = new_interest; if let Some(interest) = self.interest { registry.reregister(&mut self.io, self.token, interest)?; } else { registry.deregister(&mut self.io)?; } } Ok(()) } // Handle readiness event. Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list. // The EventLoop will call this for any connection that has received an event. fn handle_event(&mut self, event: &Event, registry: &Registry) -> Result { debug!("{:?}: handling event {:?}", self.token, event); assert_eq!(self.token, event.token()); let done = if event.is_readable() { self.recv_inbound()? } else { trace!("{:?}: not readable", self.token); false }; self.flush_outbound()?; if self.send_outbound(registry)? { // Hit EOF during send return Ok(true); } debug!( "{:?}: handling event done (recv done={}, outbound={})", self.token, done, self.outbound.is_empty() ); let done = done && self.outbound.is_empty(); // If driver is done and outbound is clear, unregister connection. if done { trace!("{:?}: driver done, clearing read interest", self.token); self.clear_readable(registry)?; } Ok(done) } // Handle wake event. Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list. // The EventLoop will call this to clear the outbound buffer for any connection that has received a wake event. fn handle_wake(&mut self, registry: &Registry) -> Result { debug!("{:?}: handling wake", self.token); self.flush_outbound()?; if self.send_outbound(registry)? { // Hit EOF during send return Ok(true); } debug!("{:?}: handling wake done", self.token); Ok(false) } fn recv_inbound(&mut self) -> Result { // If the connection is readable, read into inbound and pass to driver for processing until all ready data // has been consumed. loop { trace!("{:?}: pre-recv inbound: {:?}", self.token, self.inbound); let r = self.io.recv_msg(&mut self.inbound); match r { Ok(0) => { trace!( "{:?}: recv EOF unprocessed inbound={}", self.token, self.inbound.is_empty() ); return Ok(true); } Ok(n) => { trace!("{:?}: recv bytes: {}, process_inbound", self.token, n); let r = self.driver.process_inbound(&mut self.inbound); trace!("{:?}: process_inbound done: {:?}", self.token, r); match r { Ok(done) => { if done { return Ok(true); } } Err(e) => { debug!( "{:?}: process_inbound error: {:?} unprocessed inbound={}", self.token, e, self.inbound.is_empty() ); return Err(e); } } } Err(ref e) if would_block(e) => { trace!("{:?}: recv would_block: {:?}", self.token, e); return Ok(false); } Err(ref e) if interrupted(e) => { trace!("{:?}: recv interrupted: {:?}", self.token, e); continue; } Err(e) => { debug!("{:?}: recv error: {:?}", self.token, e); return Err(e); } } } } fn flush_outbound(&mut self) -> Result<()> { // Enqueue outbound messages to the outbound buffer, then try to write out to connection. // There may be outbound messages even if there was no inbound processing, so always attempt // to enqueue and flush. trace!("{:?}: flush_outbound", self.token); let r = self.driver.flush_outbound(&mut self.outbound); trace!("{:?}: flush_outbound done: {:?}", self.token, r); if let Err(e) = r { debug!("{:?}: flush_outbound error: {:?}", self.token, e); return Err(e); } Ok(()) } fn send_outbound(&mut self, registry: &Registry) -> Result { // Attempt to flush outbound buffer. If the connection's write buffer is full, register for WRITABLE // and complete flushing when associated notitication arrives later. while !self.outbound.is_empty() { let r = self.io.send_msg(&mut self.outbound); match r { Ok(0) => { trace!("{:?}: send EOF", self.token); return Ok(true); } Ok(n) => { trace!("{:?}: send bytes: {}", self.token, n); } Err(ref e) if would_block(e) => { trace!( "{:?}: send would_block: {:?}, setting write interest", self.token, e ); // Register for write events. self.set_writable(registry)?; break; } Err(ref e) if interrupted(e) => { trace!("{:?}: send interrupted: {:?}", self.token, e); continue; } Err(e) => { debug!("{:?}: send error: {:?}", self.token, e); return Err(e); } } trace!("{:?}: post-send: outbound {:?}", self.token, self.outbound); } // Outbound buffer flushed, clear registration for WRITABLE. if self.outbound.is_empty() { trace!("{:?}: outbound empty, clearing write interest", self.token); self.clear_writable(registry)?; } Ok(false) } } impl Drop for Connection { fn drop(&mut self) { debug!("{:?}: Connection drop", self.token); } } fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } fn interrupted(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::Interrupted } // Driver only has a single implementation, but must be hidden behind a Trait object to // hide the varying FramedDriver sizes (due to different `T` values). trait Driver { // Handle inbound messages. Returns true if Driver is done; this will trigger Connection removal and cleanup. fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result; // Write outbound messages to `outbound`. fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()>; } // Length-delimited connection framing and (de)serialization is handled by the inbound and outbound processing. // Handlers can then process message Requests and Responses without knowledge of serialization or // handle remoting. impl Driver for FramedDriver where T: Handler, T::In: DeserializeOwned + Debug + AssociateHandleForMessage, T::Out: Serialize + Debug + AssociateHandleForMessage, { // Caller passes `inbound` data, this function will trim any complete messages from `inbound` and pass them to the handler for processing. fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result { debug!("process_inbound: {:?}", inbound); // Repeatedly call `decode` as long as it produces items, passing each produced item to the handler to action. #[allow(unused_mut)] while let Some(mut item) = self.codec.decode(&mut inbound.buf)? { if item.has_associated_handle() { // On Unix, dequeue a handle from the connection and update the item's handle. #[cfg(unix)] { let new = inbound .pop_handle() .expect("inbound handle expected for item"); unsafe { item.set_local_handle(new.take()) }; } // On Windows, the deserialized item contains the correct handle value, so // convert it to an owned handle on the item. #[cfg(windows)] { assert!(inbound.pop_handle().is_none()); unsafe { item.set_local_handle() }; } } self.handler.consume(item)?; } Ok(false) } // Caller will try to write `outbound` to associated connection, queuing any data that can't be transmitted immediately. fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()> { debug!("flush_outbound: {:?}", outbound.buf); // Repeatedly grab outgoing items from the handler, passing each to `encode` for serialization into `outbound`. while let Some(mut item) = self.handler.produce()? { let handle = if item.has_associated_handle() { #[allow(unused_mut)] let mut handle = item.take_handle(); // On Windows, the handle is transferred by duplicating it into the target remote process. #[cfg(windows)] unsafe { item.set_remote_handle(handle.send_to_target()?); } Some(handle) } else { None }; self.codec.encode(item, &mut outbound.buf)?; if let Some(handle) = handle { // `outbound` retains ownership of the handle until the associated // encoded item in `outbound.buf` is sent to the remote process. outbound.push_handle(handle); } } Ok(()) } } struct FramedDriver { codec: LengthDelimitedCodec, handler: T, } impl FramedDriver { fn new(handler: T) -> FramedDriver { FramedDriver { codec: Default::default(), handler, } } } #[derive(Debug)] pub struct EventLoopThread { thread: Option>>, name: String, handle: EventLoopHandle, } impl EventLoopThread { pub fn new( name: String, stack_size: Option, after_start: F1, before_stop: F2, ) -> Result where F1: Fn() + Send + 'static, F2: Fn() + Send + 'static, { let mut event_loop = EventLoop::new(name.clone())?; let handle = event_loop.handle(); let builder = thread::Builder::new() .name(name.clone()) .stack_size(stack_size.unwrap_or(64 * 4096)); let thread = builder.spawn(move || { trace!("{}: event loop thread enter", event_loop.name); after_start(); let _thread_exit_guard = scopeguard::guard((), |_| before_stop()); let r = loop { let start = std::time::Instant::now(); let r = event_loop.poll(); trace!( "{}: event loop poll r={:?}, took={}μs", event_loop.name, r, start.elapsed().as_micros() ); match r { Ok(true) => continue, _ => break r, } }; trace!("{}: event loop thread exit", event_loop.name); r.map(|_| ()) })?; Ok(EventLoopThread { thread: Some(thread), name, handle, }) } pub fn handle(&self) -> &EventLoopHandle { &self.handle } } impl Drop for EventLoopThread { // Shut down event loop and executor thread. Blocks until complete. fn drop(&mut self) { trace!("{}: EventLoopThread shutdown", self.name); if let Err(e) = self.handle.shutdown() { debug!("{}: initiating shutdown failed: {:?}", self.name, e); } let thread = self.thread.take().expect("event loop thread"); if let Err(e) = thread.join() { error!("{}: EventLoopThread failed: {:?}", self.name, e); } trace!("{}: EventLoopThread shutdown done", self.name); } } #[cfg(test)] mod test { use serde_derive::{Deserialize, Serialize}; use super::*; #[derive(Debug, Serialize, Deserialize, PartialEq)] enum TestServerMessage { TestRequest, } impl AssociateHandleForMessage for TestServerMessage {} struct TestServerImpl {} impl Server for TestServerImpl { type ServerMessage = TestServerMessage; type ClientMessage = TestClientMessage; fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { assert_eq!(req, TestServerMessage::TestRequest); TestClientMessage::TestResponse } } #[derive(Debug, Serialize, Deserialize, PartialEq)] enum TestClientMessage { TestResponse, } impl AssociateHandleForMessage for TestClientMessage {} struct TestClientImpl {} impl Client for TestClientImpl { type ServerMessage = TestServerMessage; type ClientMessage = TestClientMessage; } fn init() { let _ = env_logger::builder().is_test(true).try_init(); } fn setup() -> ( EventLoopThread, EventLoopThread, Proxy, ) { // Server setup and registration. let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {}) .expect("server EventLoopThread"); let server_handle = server.handle(); let (server_pipe, client_pipe) = sys::make_pipe_pair().expect("server make_pipe_pair"); server_handle .bind_server(TestServerImpl {}, server_pipe) .expect("server bind_server"); // Client setup and registration. let client = EventLoopThread::new("test-client".to_string(), None, || {}, || {}) .expect("client EventLoopThread"); let client_handle = client.handle(); let client_pipe = unsafe { sys::Pipe::from_raw_handle(client_pipe) }; let client_proxy = client_handle .bind_client::(client_pipe) .expect("client bind_client"); (server, client, client_proxy) } // Verify basic EventLoopThread functionality works. Create a server and client EventLoopThread, then send // a single message from the client to the server and wait for the expected response. #[test] fn basic() { init(); let (server, client, client_proxy) = setup(); // RPC message from client to server. let response = client_proxy.call(TestServerMessage::TestRequest); let response = response.expect("client response"); assert_eq!(response, TestClientMessage::TestResponse); // Explicit shutdown. drop(client); drop(server); } // Same as `basic`, but shut down server before client. #[test] fn basic_reverse_drop_order() { init(); let (server, client, client_proxy) = setup(); // RPC message from client to server. let response = client_proxy.call(TestServerMessage::TestRequest); let response = response.expect("client response"); assert_eq!(response, TestClientMessage::TestResponse); // Explicit shutdown. drop(server); drop(client); } #[test] fn dead_server() { init(); let (server, _client, client_proxy) = setup(); drop(server); let response = client_proxy.call(TestServerMessage::TestRequest); response.expect_err("sending on closed channel"); } #[test] fn dead_client() { init(); let (_server, client, client_proxy) = setup(); drop(client); let response = client_proxy.call(TestServerMessage::TestRequest); response.expect_err("sending on a closed channel"); } #[test] fn disconnected_handle() { init(); let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {}) .expect("server EventLoopThread"); let server_handle = server.handle().clone(); drop(server); server_handle .shutdown() .expect_err("sending on closed channel"); } #[test] fn clone_after_drop() { init(); let (server, client, client_proxy) = setup(); drop(server); drop(client); let clone = client_proxy.clone(); let response = clone.call(TestServerMessage::TestRequest); response.expect_err("sending to a dropped ClientHandler"); } #[test] fn basic_event_loop_thread_callbacks() { init(); let (start_tx, start_rx) = mpsc::channel(); let (stop_tx, stop_rx) = mpsc::channel(); let elt = EventLoopThread::new( "test-thread-callbacks".to_string(), None, move || start_tx.send(()).unwrap(), move || stop_tx.send(()).unwrap(), ) .expect("server EventLoopThread"); start_rx.recv().expect("after_start callback done"); drop(elt); stop_rx.recv().expect("before_stop callback done"); } }