use log::debug; use crate::durability::Durability; use crate::runtime::ActiveQuery; use crate::runtime::Revision; use crate::Cycle; use crate::DatabaseKeyIndex; use std::cell::RefCell; use std::sync::Arc; /// State that is specific to a single execution thread. /// /// Internally, this type uses ref-cells. /// /// **Note also that all mutations to the database handle (and hence /// to the local-state) must be undone during unwinding.** pub(super) struct LocalState { /// Vector of active queries. /// /// This is normally `Some`, but it is set to `None` /// while the query is blocked waiting for a result. /// /// Unwinding note: pushes onto this vector must be popped -- even /// during unwinding. query_stack: RefCell>>, } /// Summarizes "all the inputs that a query used" #[derive(Debug, Clone)] pub(crate) struct QueryRevisions { /// The most revision in which some input changed. pub(crate) changed_at: Revision, /// Minimum durability of the inputs to this query. pub(crate) durability: Durability, /// The inputs that went into our query, if we are tracking them. pub(crate) inputs: QueryInputs, } /// Every input. #[derive(Debug, Clone)] pub(crate) enum QueryInputs { /// Non-empty set of inputs, fully known Tracked { inputs: Arc<[DatabaseKeyIndex]> }, /// Empty set of inputs, fully known. NoInputs, /// Unknown quantity of inputs Untracked, } impl Default for LocalState { fn default() -> Self { LocalState { query_stack: RefCell::new(Some(Vec::new())), } } } impl LocalState { #[inline] pub(super) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> { let mut query_stack = self.query_stack.borrow_mut(); let query_stack = query_stack.as_mut().expect("local stack taken"); query_stack.push(ActiveQuery::new(database_key_index)); ActiveQueryGuard { local_state: self, database_key_index, push_len: query_stack.len(), } } fn with_query_stack(&self, c: impl FnOnce(&mut Vec) -> R) -> R { c(self .query_stack .borrow_mut() .as_mut() .expect("query stack taken")) } pub(super) fn query_in_progress(&self) -> bool { self.with_query_stack(|stack| !stack.is_empty()) } pub(super) fn active_query(&self) -> Option { self.with_query_stack(|stack| { stack .last() .map(|active_query| active_query.database_key_index) }) } pub(super) fn report_query_read_and_unwind_if_cycle_resulted( &self, input: DatabaseKeyIndex, durability: Durability, changed_at: Revision, ) { debug!( "report_query_read_and_unwind_if_cycle_resulted(input={:?}, durability={:?}, changed_at={:?})", input, durability, changed_at ); self.with_query_stack(|stack| { if let Some(top_query) = stack.last_mut() { top_query.add_read(input, durability, changed_at); // We are a cycle participant: // // C0 --> ... --> Ci --> Ci+1 -> ... -> Cn --> C0 // ^ ^ // : | // This edge -----+ | // | // | // N0 // // In this case, the value we have just read from `Ci+1` // is actually the cycle fallback value and not especially // interesting. We unwind now with `CycleParticipant` to avoid // executing the rest of our query function. This unwinding // will be caught and our own fallback value will be used. // // Note that `Ci+1` may` have *other* callers who are not // participants in the cycle (e.g., N0 in the graph above). // They will not have the `cycle` marker set in their // stack frames, so they will just read the fallback value // from `Ci+1` and continue on their merry way. if let Some(cycle) = &top_query.cycle { cycle.clone().throw() } } }) } pub(super) fn report_untracked_read(&self, current_revision: Revision) { self.with_query_stack(|stack| { if let Some(top_query) = stack.last_mut() { top_query.add_untracked_read(current_revision); } }) } /// Update the top query on the stack to act as though it read a value /// of durability `durability` which changed in `revision`. pub(super) fn report_synthetic_read(&self, durability: Durability, revision: Revision) { self.with_query_stack(|stack| { if let Some(top_query) = stack.last_mut() { top_query.add_synthetic_read(durability, revision); } }) } /// Takes the query stack and returns it. This is used when /// the current thread is blocking. The stack must be restored /// with [`Self::restore_query_stack`] when the thread unblocks. pub(super) fn take_query_stack(&self) -> Vec { assert!( self.query_stack.borrow().is_some(), "query stack already taken" ); self.query_stack.take().unwrap() } /// Restores a query stack taken with [`Self::take_query_stack`] once /// the thread unblocks. pub(super) fn restore_query_stack(&self, stack: Vec) { assert!(self.query_stack.borrow().is_none(), "query stack not taken"); self.query_stack.replace(Some(stack)); } } impl std::panic::RefUnwindSafe for LocalState {} /// When a query is pushed onto the `active_query` stack, this guard /// is returned to represent its slot. The guard can be used to pop /// the query from the stack -- in the case of unwinding, the guard's /// destructor will also remove the query. pub(crate) struct ActiveQueryGuard<'me> { local_state: &'me LocalState, push_len: usize, database_key_index: DatabaseKeyIndex, } impl ActiveQueryGuard<'_> { fn pop_helper(&self) -> ActiveQuery { self.local_state.with_query_stack(|stack| { // Sanity check: pushes and pops should be balanced. assert_eq!(stack.len(), self.push_len); debug_assert_eq!( stack.last().unwrap().database_key_index, self.database_key_index ); stack.pop().unwrap() }) } /// Invoked when the query has successfully completed execution. pub(super) fn complete(self) -> ActiveQuery { let query = self.pop_helper(); std::mem::forget(self); query } /// Pops an active query from the stack. Returns the [`QueryRevisions`] /// which summarizes the other queries that were accessed during this /// query's execution. #[inline] pub(crate) fn pop(self) -> QueryRevisions { // Extract accumulated inputs. let popped_query = self.complete(); // If this frame were a cycle participant, it would have unwound. assert!(popped_query.cycle.is_none()); popped_query.revisions() } /// If the active query is registered as a cycle participant, remove and /// return that cycle. pub(crate) fn take_cycle(&self) -> Option { self.local_state .with_query_stack(|stack| stack.last_mut()?.cycle.take()) } } impl Drop for ActiveQueryGuard<'_> { fn drop(&mut self) { self.pop_helper(); } }