summaryrefslogtreecommitdiffstats
path: root/library/std/src/sys/itron/condvar.rs
diff options
context:
space:
mode:
Diffstat (limited to 'library/std/src/sys/itron/condvar.rs')
-rw-r--r--library/std/src/sys/itron/condvar.rs297
1 files changed, 297 insertions, 0 deletions
diff --git a/library/std/src/sys/itron/condvar.rs b/library/std/src/sys/itron/condvar.rs
new file mode 100644
index 000000000..008cd8fb1
--- /dev/null
+++ b/library/std/src/sys/itron/condvar.rs
@@ -0,0 +1,297 @@
+//! POSIX conditional variable implementation based on user-space wait queues.
+use super::{abi, error::expect_success_aborting, spin::SpinMutex, task, time::with_tmos_strong};
+use crate::{mem::replace, ptr::NonNull, sys::locks::Mutex, time::Duration};
+
+// The implementation is inspired by the queue-based implementation shown in
+// Andrew D. Birrell's paper "Implementing Condition Variables with Semaphores"
+
+pub struct Condvar {
+ waiters: SpinMutex<waiter_queue::WaiterQueue>,
+}
+
+unsafe impl Send for Condvar {}
+unsafe impl Sync for Condvar {}
+
+pub type MovableCondvar = Condvar;
+
+impl Condvar {
+ #[inline]
+ pub const fn new() -> Condvar {
+ Condvar { waiters: SpinMutex::new(waiter_queue::WaiterQueue::new()) }
+ }
+
+ #[inline]
+ pub unsafe fn init(&mut self) {}
+
+ pub unsafe fn notify_one(&self) {
+ self.waiters.with_locked(|waiters| {
+ if let Some(task) = waiters.pop_front() {
+ // Unpark the task
+ match unsafe { abi::wup_tsk(task) } {
+ // The task already has a token.
+ abi::E_QOVR => {}
+ // Can't undo the effect; abort the program on failure
+ er => {
+ expect_success_aborting(er, &"wup_tsk");
+ }
+ }
+ }
+ });
+ }
+
+ pub unsafe fn notify_all(&self) {
+ self.waiters.with_locked(|waiters| {
+ while let Some(task) = waiters.pop_front() {
+ // Unpark the task
+ match unsafe { abi::wup_tsk(task) } {
+ // The task already has a token.
+ abi::E_QOVR => {}
+ // Can't undo the effect; abort the program on failure
+ er => {
+ expect_success_aborting(er, &"wup_tsk");
+ }
+ }
+ }
+ });
+ }
+
+ pub unsafe fn wait(&self, mutex: &Mutex) {
+ // Construct `Waiter`.
+ let mut waiter = waiter_queue::Waiter::new();
+ let waiter = NonNull::from(&mut waiter);
+
+ self.waiters.with_locked(|waiters| unsafe {
+ waiters.insert(waiter);
+ });
+
+ unsafe { mutex.unlock() };
+
+ // Wait until `waiter` is removed from the queue
+ loop {
+ // Park the current task
+ expect_success_aborting(unsafe { abi::slp_tsk() }, &"slp_tsk");
+
+ if !self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) {
+ break;
+ }
+ }
+
+ unsafe { mutex.lock() };
+ }
+
+ pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
+ // Construct and pin `Waiter`
+ let mut waiter = waiter_queue::Waiter::new();
+ let waiter = NonNull::from(&mut waiter);
+
+ self.waiters.with_locked(|waiters| unsafe {
+ waiters.insert(waiter);
+ });
+
+ unsafe { mutex.unlock() };
+
+ // Park the current task and do not wake up until the timeout elapses
+ // or the task gets woken up by `notify_*`
+ match with_tmos_strong(dur, |tmo| {
+ let er = unsafe { abi::tslp_tsk(tmo) };
+ if er == 0 {
+ // We were unparked. Are we really dequeued?
+ if self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) {
+ // No we are not. Continue waiting.
+ return abi::E_TMOUT;
+ }
+ }
+ er
+ }) {
+ abi::E_TMOUT => {}
+ er => {
+ expect_success_aborting(er, &"tslp_tsk");
+ }
+ }
+
+ // Remove `waiter` from `self.waiters`. If `waiter` is still in
+ // `waiters`, it means we woke up because of a timeout. Otherwise,
+ // we woke up because of `notify_*`.
+ let success = self.waiters.with_locked(|waiters| unsafe { !waiters.remove(waiter) });
+
+ unsafe { mutex.lock() };
+ success
+ }
+}
+
+mod waiter_queue {
+ use super::*;
+
+ pub struct WaiterQueue {
+ head: Option<ListHead>,
+ }
+
+ #[derive(Copy, Clone)]
+ struct ListHead {
+ first: NonNull<Waiter>,
+ last: NonNull<Waiter>,
+ }
+
+ unsafe impl Send for ListHead {}
+ unsafe impl Sync for ListHead {}
+
+ pub struct Waiter {
+ // These fields are only accessed through `&[mut] WaiterQueue`.
+ /// The waiting task's ID. Will be zeroed when the task is woken up
+ /// and removed from a queue.
+ task: abi::ID,
+ priority: abi::PRI,
+ prev: Option<NonNull<Waiter>>,
+ next: Option<NonNull<Waiter>>,
+ }
+
+ unsafe impl Send for Waiter {}
+ unsafe impl Sync for Waiter {}
+
+ impl Waiter {
+ #[inline]
+ pub fn new() -> Self {
+ let task = task::current_task_id();
+ let priority = task::task_priority(abi::TSK_SELF);
+
+ // Zeroness of `Waiter::task` indicates whether the `Waiter` is
+ // linked to a queue or not. This invariant is important for
+ // the correctness.
+ debug_assert_ne!(task, 0);
+
+ Self { task, priority, prev: None, next: None }
+ }
+ }
+
+ impl WaiterQueue {
+ #[inline]
+ pub const fn new() -> Self {
+ Self { head: None }
+ }
+
+ /// # Safety
+ ///
+ /// - The caller must own `*waiter_ptr`. The caller will lose the
+ /// ownership until `*waiter_ptr` is removed from `self`.
+ ///
+ /// - `*waiter_ptr` must be valid until it's removed from the queue.
+ ///
+ /// - `*waiter_ptr` must not have been previously inserted to a `WaiterQueue`.
+ ///
+ pub unsafe fn insert(&mut self, mut waiter_ptr: NonNull<Waiter>) {
+ unsafe {
+ let waiter = waiter_ptr.as_mut();
+
+ debug_assert!(waiter.prev.is_none());
+ debug_assert!(waiter.next.is_none());
+
+ if let Some(head) = &mut self.head {
+ // Find the insertion position and insert `waiter`
+ let insert_after = {
+ let mut cursor = head.last;
+ loop {
+ if waiter.priority >= cursor.as_ref().priority {
+ // `cursor` and all previous waiters have the same or higher
+ // priority than `current_task_priority`. Insert the new
+ // waiter right after `cursor`.
+ break Some(cursor);
+ }
+ cursor = if let Some(prev) = cursor.as_ref().prev {
+ prev
+ } else {
+ break None;
+ };
+ }
+ };
+
+ if let Some(mut insert_after) = insert_after {
+ // Insert `waiter` after `insert_after`
+ let insert_before = insert_after.as_ref().next;
+
+ waiter.prev = Some(insert_after);
+ insert_after.as_mut().next = Some(waiter_ptr);
+
+ waiter.next = insert_before;
+ if let Some(mut insert_before) = insert_before {
+ insert_before.as_mut().prev = Some(waiter_ptr);
+ } else {
+ head.last = waiter_ptr;
+ }
+ } else {
+ // Insert `waiter` to the front
+ waiter.next = Some(head.first);
+ head.first.as_mut().prev = Some(waiter_ptr);
+ head.first = waiter_ptr;
+ }
+ } else {
+ // `waiter` is the only element
+ self.head = Some(ListHead { first: waiter_ptr, last: waiter_ptr });
+ }
+ }
+ }
+
+ /// Given a `Waiter` that was previously inserted to `self`, remove
+ /// it from `self` if it's still there.
+ #[inline]
+ pub unsafe fn remove(&mut self, mut waiter_ptr: NonNull<Waiter>) -> bool {
+ unsafe {
+ let waiter = waiter_ptr.as_mut();
+ if waiter.task != 0 {
+ let head = self.head.as_mut().unwrap();
+
+ match (waiter.prev, waiter.next) {
+ (Some(mut prev), Some(mut next)) => {
+ prev.as_mut().next = Some(next);
+ next.as_mut().prev = Some(prev);
+ }
+ (None, Some(mut next)) => {
+ head.first = next;
+ next.as_mut().prev = None;
+ }
+ (Some(mut prev), None) => {
+ prev.as_mut().next = None;
+ head.last = prev;
+ }
+ (None, None) => {
+ self.head = None;
+ }
+ }
+
+ waiter.task = 0;
+
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ /// Given a `Waiter` that was previously inserted to `self`, return a
+ /// flag indicating whether it's still in `self`.
+ #[inline]
+ pub unsafe fn is_queued(&self, waiter: NonNull<Waiter>) -> bool {
+ unsafe { waiter.as_ref().task != 0 }
+ }
+
+ #[inline]
+ pub fn pop_front(&mut self) -> Option<abi::ID> {
+ unsafe {
+ let head = self.head.as_mut()?;
+ let waiter = head.first.as_mut();
+
+ // Get the ID
+ let id = replace(&mut waiter.task, 0);
+
+ // Unlink the waiter
+ if let Some(mut next) = waiter.next {
+ head.first = next;
+ next.as_mut().prev = None;
+ } else {
+ self.head = None;
+ }
+
+ Some(id)
+ }
+ }
+ }
+}