summaryrefslogtreecommitdiffstats
path: root/library/std/src/sys/itron/condvar.rs
blob: 008cd8fb1e3926f3dbe5c65bde43afda92ddc31c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
//! POSIX conditional variable implementation based on user-space wait queues.
use super::{abi, error::expect_success_aborting, spin::SpinMutex, task, time::with_tmos_strong};
use crate::{mem::replace, ptr::NonNull, sys::locks::Mutex, time::Duration};

// The implementation is inspired by the queue-based implementation shown in
// Andrew D. Birrell's paper "Implementing Condition Variables with Semaphores"

pub struct Condvar {
    waiters: SpinMutex<waiter_queue::WaiterQueue>,
}

unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}

pub type MovableCondvar = Condvar;

impl Condvar {
    #[inline]
    pub const fn new() -> Condvar {
        Condvar { waiters: SpinMutex::new(waiter_queue::WaiterQueue::new()) }
    }

    #[inline]
    pub unsafe fn init(&mut self) {}

    pub unsafe fn notify_one(&self) {
        self.waiters.with_locked(|waiters| {
            if let Some(task) = waiters.pop_front() {
                // Unpark the task
                match unsafe { abi::wup_tsk(task) } {
                    // The task already has a token.
                    abi::E_QOVR => {}
                    // Can't undo the effect; abort the program on failure
                    er => {
                        expect_success_aborting(er, &"wup_tsk");
                    }
                }
            }
        });
    }

    pub unsafe fn notify_all(&self) {
        self.waiters.with_locked(|waiters| {
            while let Some(task) = waiters.pop_front() {
                // Unpark the task
                match unsafe { abi::wup_tsk(task) } {
                    // The task already has a token.
                    abi::E_QOVR => {}
                    // Can't undo the effect; abort the program on failure
                    er => {
                        expect_success_aborting(er, &"wup_tsk");
                    }
                }
            }
        });
    }

    pub unsafe fn wait(&self, mutex: &Mutex) {
        // Construct `Waiter`.
        let mut waiter = waiter_queue::Waiter::new();
        let waiter = NonNull::from(&mut waiter);

        self.waiters.with_locked(|waiters| unsafe {
            waiters.insert(waiter);
        });

        unsafe { mutex.unlock() };

        // Wait until `waiter` is removed from the queue
        loop {
            // Park the current task
            expect_success_aborting(unsafe { abi::slp_tsk() }, &"slp_tsk");

            if !self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) {
                break;
            }
        }

        unsafe { mutex.lock() };
    }

    pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
        // Construct and pin `Waiter`
        let mut waiter = waiter_queue::Waiter::new();
        let waiter = NonNull::from(&mut waiter);

        self.waiters.with_locked(|waiters| unsafe {
            waiters.insert(waiter);
        });

        unsafe { mutex.unlock() };

        // Park the current task and do not wake up until the timeout elapses
        // or the task gets woken up by `notify_*`
        match with_tmos_strong(dur, |tmo| {
            let er = unsafe { abi::tslp_tsk(tmo) };
            if er == 0 {
                // We were unparked. Are we really dequeued?
                if self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) {
                    // No we are not. Continue waiting.
                    return abi::E_TMOUT;
                }
            }
            er
        }) {
            abi::E_TMOUT => {}
            er => {
                expect_success_aborting(er, &"tslp_tsk");
            }
        }

        // Remove `waiter` from `self.waiters`. If `waiter` is still in
        // `waiters`, it means we woke up because of a timeout. Otherwise,
        // we woke up because of `notify_*`.
        let success = self.waiters.with_locked(|waiters| unsafe { !waiters.remove(waiter) });

        unsafe { mutex.lock() };
        success
    }
}

mod waiter_queue {
    use super::*;

    pub struct WaiterQueue {
        head: Option<ListHead>,
    }

    #[derive(Copy, Clone)]
    struct ListHead {
        first: NonNull<Waiter>,
        last: NonNull<Waiter>,
    }

    unsafe impl Send for ListHead {}
    unsafe impl Sync for ListHead {}

    pub struct Waiter {
        // These fields are only accessed through `&[mut] WaiterQueue`.
        /// The waiting task's ID. Will be zeroed when the task is woken up
        /// and removed from a queue.
        task: abi::ID,
        priority: abi::PRI,
        prev: Option<NonNull<Waiter>>,
        next: Option<NonNull<Waiter>>,
    }

    unsafe impl Send for Waiter {}
    unsafe impl Sync for Waiter {}

    impl Waiter {
        #[inline]
        pub fn new() -> Self {
            let task = task::current_task_id();
            let priority = task::task_priority(abi::TSK_SELF);

            // Zeroness of `Waiter::task` indicates whether the `Waiter` is
            // linked to a queue or not. This invariant is important for
            // the correctness.
            debug_assert_ne!(task, 0);

            Self { task, priority, prev: None, next: None }
        }
    }

    impl WaiterQueue {
        #[inline]
        pub const fn new() -> Self {
            Self { head: None }
        }

        /// # Safety
        ///
        ///  - The caller must own `*waiter_ptr`. The caller will lose the
        ///    ownership until `*waiter_ptr` is removed from `self`.
        ///
        ///  - `*waiter_ptr` must be valid until it's removed from the queue.
        ///
        ///  - `*waiter_ptr` must not have been previously inserted to a `WaiterQueue`.
        ///
        pub unsafe fn insert(&mut self, mut waiter_ptr: NonNull<Waiter>) {
            unsafe {
                let waiter = waiter_ptr.as_mut();

                debug_assert!(waiter.prev.is_none());
                debug_assert!(waiter.next.is_none());

                if let Some(head) = &mut self.head {
                    // Find the insertion position and insert `waiter`
                    let insert_after = {
                        let mut cursor = head.last;
                        loop {
                            if waiter.priority >= cursor.as_ref().priority {
                                // `cursor` and all previous waiters have the same or higher
                                // priority than `current_task_priority`. Insert the new
                                // waiter right after `cursor`.
                                break Some(cursor);
                            }
                            cursor = if let Some(prev) = cursor.as_ref().prev {
                                prev
                            } else {
                                break None;
                            };
                        }
                    };

                    if let Some(mut insert_after) = insert_after {
                        // Insert `waiter` after `insert_after`
                        let insert_before = insert_after.as_ref().next;

                        waiter.prev = Some(insert_after);
                        insert_after.as_mut().next = Some(waiter_ptr);

                        waiter.next = insert_before;
                        if let Some(mut insert_before) = insert_before {
                            insert_before.as_mut().prev = Some(waiter_ptr);
                        } else {
                            head.last = waiter_ptr;
                        }
                    } else {
                        // Insert `waiter` to the front
                        waiter.next = Some(head.first);
                        head.first.as_mut().prev = Some(waiter_ptr);
                        head.first = waiter_ptr;
                    }
                } else {
                    // `waiter` is the only element
                    self.head = Some(ListHead { first: waiter_ptr, last: waiter_ptr });
                }
            }
        }

        /// Given a `Waiter` that was previously inserted to `self`, remove
        /// it from `self` if it's still there.
        #[inline]
        pub unsafe fn remove(&mut self, mut waiter_ptr: NonNull<Waiter>) -> bool {
            unsafe {
                let waiter = waiter_ptr.as_mut();
                if waiter.task != 0 {
                    let head = self.head.as_mut().unwrap();

                    match (waiter.prev, waiter.next) {
                        (Some(mut prev), Some(mut next)) => {
                            prev.as_mut().next = Some(next);
                            next.as_mut().prev = Some(prev);
                        }
                        (None, Some(mut next)) => {
                            head.first = next;
                            next.as_mut().prev = None;
                        }
                        (Some(mut prev), None) => {
                            prev.as_mut().next = None;
                            head.last = prev;
                        }
                        (None, None) => {
                            self.head = None;
                        }
                    }

                    waiter.task = 0;

                    true
                } else {
                    false
                }
            }
        }

        /// Given a `Waiter` that was previously inserted to `self`, return a
        /// flag indicating whether it's still in `self`.
        #[inline]
        pub unsafe fn is_queued(&self, waiter: NonNull<Waiter>) -> bool {
            unsafe { waiter.as_ref().task != 0 }
        }

        #[inline]
        pub fn pop_front(&mut self) -> Option<abi::ID> {
            unsafe {
                let head = self.head.as_mut()?;
                let waiter = head.first.as_mut();

                // Get the ID
                let id = replace(&mut waiter.task, 0);

                // Unlink the waiter
                if let Some(mut next) = waiter.next {
                    head.first = next;
                    next.as_mut().prev = None;
                } else {
                    self.head = None;
                }

                Some(id)
            }
        }
    }
}