//! POSIX conditional variable implementation based on user-space wait queues. use super::{abi, error::expect_success_aborting, spin::SpinMutex, task, time::with_tmos_strong}; use crate::{mem::replace, ptr::NonNull, sys::locks::Mutex, time::Duration}; // The implementation is inspired by the queue-based implementation shown in // Andrew D. Birrell's paper "Implementing Condition Variables with Semaphores" pub struct Condvar { waiters: SpinMutex, } unsafe impl Send for Condvar {} unsafe impl Sync for Condvar {} impl Condvar { #[inline] pub const fn new() -> Condvar { Condvar { waiters: SpinMutex::new(waiter_queue::WaiterQueue::new()) } } pub fn notify_one(&self) { self.waiters.with_locked(|waiters| { if let Some(task) = waiters.pop_front() { // Unpark the task match unsafe { abi::wup_tsk(task) } { // The task already has a token. abi::E_QOVR => {} // Can't undo the effect; abort the program on failure er => { expect_success_aborting(er, &"wup_tsk"); } } } }); } pub fn notify_all(&self) { self.waiters.with_locked(|waiters| { while let Some(task) = waiters.pop_front() { // Unpark the task match unsafe { abi::wup_tsk(task) } { // The task already has a token. abi::E_QOVR => {} // Can't undo the effect; abort the program on failure er => { expect_success_aborting(er, &"wup_tsk"); } } } }); } pub unsafe fn wait(&self, mutex: &Mutex) { // Construct `Waiter`. let mut waiter = waiter_queue::Waiter::new(); let waiter = NonNull::from(&mut waiter); self.waiters.with_locked(|waiters| unsafe { waiters.insert(waiter); }); unsafe { mutex.unlock() }; // Wait until `waiter` is removed from the queue loop { // Park the current task expect_success_aborting(unsafe { abi::slp_tsk() }, &"slp_tsk"); if !self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) { break; } } mutex.lock(); } pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { // Construct and pin `Waiter` let mut waiter = waiter_queue::Waiter::new(); let waiter = NonNull::from(&mut waiter); self.waiters.with_locked(|waiters| unsafe { waiters.insert(waiter); }); unsafe { mutex.unlock() }; // Park the current task and do not wake up until the timeout elapses // or the task gets woken up by `notify_*` match with_tmos_strong(dur, |tmo| { let er = unsafe { abi::tslp_tsk(tmo) }; if er == 0 { // We were unparked. Are we really dequeued? if self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) { // No we are not. Continue waiting. return abi::E_TMOUT; } } er }) { abi::E_TMOUT => {} er => { expect_success_aborting(er, &"tslp_tsk"); } } // Remove `waiter` from `self.waiters`. If `waiter` is still in // `waiters`, it means we woke up because of a timeout. Otherwise, // we woke up because of `notify_*`. let success = self.waiters.with_locked(|waiters| unsafe { !waiters.remove(waiter) }); mutex.lock(); success } } mod waiter_queue { use super::*; pub struct WaiterQueue { head: Option, } #[derive(Copy, Clone)] struct ListHead { first: NonNull, last: NonNull, } unsafe impl Send for ListHead {} unsafe impl Sync for ListHead {} pub struct Waiter { // These fields are only accessed through `&[mut] WaiterQueue`. /// The waiting task's ID. Will be zeroed when the task is woken up /// and removed from a queue. task: abi::ID, priority: abi::PRI, prev: Option>, next: Option>, } unsafe impl Send for Waiter {} unsafe impl Sync for Waiter {} impl Waiter { #[inline] pub fn new() -> Self { let task = task::current_task_id(); let priority = task::task_priority(abi::TSK_SELF); // Zeroness of `Waiter::task` indicates whether the `Waiter` is // linked to a queue or not. This invariant is important for // the correctness. debug_assert_ne!(task, 0); Self { task, priority, prev: None, next: None } } } impl WaiterQueue { #[inline] pub const fn new() -> Self { Self { head: None } } /// # Safety /// /// - The caller must own `*waiter_ptr`. The caller will lose the /// ownership until `*waiter_ptr` is removed from `self`. /// /// - `*waiter_ptr` must be valid until it's removed from the queue. /// /// - `*waiter_ptr` must not have been previously inserted to a `WaiterQueue`. /// pub unsafe fn insert(&mut self, mut waiter_ptr: NonNull) { unsafe { let waiter = waiter_ptr.as_mut(); debug_assert!(waiter.prev.is_none()); debug_assert!(waiter.next.is_none()); if let Some(head) = &mut self.head { // Find the insertion position and insert `waiter` let insert_after = { let mut cursor = head.last; loop { if waiter.priority >= cursor.as_ref().priority { // `cursor` and all previous waiters have the same or higher // priority than `current_task_priority`. Insert the new // waiter right after `cursor`. break Some(cursor); } cursor = if let Some(prev) = cursor.as_ref().prev { prev } else { break None; }; } }; if let Some(mut insert_after) = insert_after { // Insert `waiter` after `insert_after` let insert_before = insert_after.as_ref().next; waiter.prev = Some(insert_after); insert_after.as_mut().next = Some(waiter_ptr); waiter.next = insert_before; if let Some(mut insert_before) = insert_before { insert_before.as_mut().prev = Some(waiter_ptr); } else { head.last = waiter_ptr; } } else { // Insert `waiter` to the front waiter.next = Some(head.first); head.first.as_mut().prev = Some(waiter_ptr); head.first = waiter_ptr; } } else { // `waiter` is the only element self.head = Some(ListHead { first: waiter_ptr, last: waiter_ptr }); } } } /// Given a `Waiter` that was previously inserted to `self`, remove /// it from `self` if it's still there. #[inline] pub unsafe fn remove(&mut self, mut waiter_ptr: NonNull) -> bool { unsafe { let waiter = waiter_ptr.as_mut(); if waiter.task != 0 { let head = self.head.as_mut().unwrap(); match (waiter.prev, waiter.next) { (Some(mut prev), Some(mut next)) => { prev.as_mut().next = Some(next); next.as_mut().prev = Some(prev); } (None, Some(mut next)) => { head.first = next; next.as_mut().prev = None; } (Some(mut prev), None) => { prev.as_mut().next = None; head.last = prev; } (None, None) => { self.head = None; } } waiter.task = 0; true } else { false } } } /// Given a `Waiter` that was previously inserted to `self`, return a /// flag indicating whether it's still in `self`. #[inline] pub unsafe fn is_queued(&self, waiter: NonNull) -> bool { unsafe { waiter.as_ref().task != 0 } } #[inline] pub fn pop_front(&mut self) -> Option { unsafe { let head = self.head.as_mut()?; let waiter = head.first.as_mut(); // Get the ID let id = replace(&mut waiter.task, 0); // Unlink the waiter if let Some(mut next) = waiter.next { head.first = next; next.as_mut().prev = None; } else { self.head = None; } Some(id) } } } }