summaryrefslogtreecommitdiffstats
path: root/library/std/src/sys_common/thread_parking/id.rs
blob: 575988ec760c7a84cd5109b3e7754d51b756816a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
//! Thread parking using thread ids.
//!
//! Some platforms (notably NetBSD) have thread parking primitives whose semantics
//! match those offered by `thread::park`, with the difference that the thread to
//! be unparked is referenced by a platform-specific thread id. Since the thread
//! parker is constructed before that id is known, an atomic state variable is used
//! to manage the park state and propagate the thread id. This also avoids platform
//! calls in the case where `unpark` is called before `park`.

use crate::cell::UnsafeCell;
use crate::pin::Pin;
use crate::sync::atomic::{
    fence, AtomicI8,
    Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId};
use crate::time::Duration;

pub struct Parker {
    state: AtomicI8,
    tid: UnsafeCell<Option<ThreadId>>,
}

const PARKED: i8 = -1;
const EMPTY: i8 = 0;
const NOTIFIED: i8 = 1;

impl Parker {
    pub fn new() -> Parker {
        Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) }
    }

    /// Create a new thread parker. UNIX requires this to happen in-place.
    pub unsafe fn new_in_place(parker: *mut Parker) {
        parker.write(Parker::new())
    }

    /// # Safety
    /// * must always be called from the same thread
    /// * must be called before the state is set to PARKED
    unsafe fn init_tid(&self) {
        // The field is only ever written to from this thread, so we don't need
        // synchronization to read it here.
        if self.tid.get().read().is_none() {
            // Because this point is only reached once, before the state is set
            // to PARKED for the first time, the non-atomic write here can not
            // conflict with reads by other threads.
            self.tid.get().write(Some(current()));
            // Ensure that the write can be observed by all threads reading the
            // state. Synchronizes with the acquire barrier in `unpark`.
            fence(Release);
        }
    }

    pub unsafe fn park(self: Pin<&Self>) {
        self.init_tid();

        // Changes NOTIFIED to EMPTY and EMPTY to PARKED.
        let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
        if state == PARKED {
            // Loop to guard against spurious wakeups.
            while state == PARKED {
                park(self.state.as_ptr().addr());
                state = self.state.load(Acquire);
            }

            // Since the state change has already been observed with acquire
            // ordering, the state can be reset with a relaxed store instead
            // of a swap.
            self.state.store(EMPTY, Relaxed);
        }
    }

    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
        self.init_tid();

        let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
        if state == PARKED {
            park_timeout(dur, self.state.as_ptr().addr());
            // Swap to ensure that we observe all state changes with acquire
            // ordering, even if the state has been changed after the timeout
            // occured.
            self.state.swap(EMPTY, Acquire);
        }
    }

    pub fn unpark(self: Pin<&Self>) {
        let state = self.state.swap(NOTIFIED, Release);
        if state == PARKED {
            // Synchronize with the release fence in `init_tid` to observe the
            // write to `tid`.
            fence(Acquire);
            // # Safety
            // The thread id is initialized before the state is set to `PARKED`
            // for the first time and is not written to from that point on
            // (negating the need for an atomic read).
            let tid = unsafe { self.tid.get().read().unwrap_unchecked() };
            // It is possible that the waiting thread woke up because of a timeout
            // and terminated before this call is made. This call then returns an
            // error or wakes up an unrelated thread. The platform API and
            // environment does allow this, however.
            unpark(tid, self.state.as_ptr().addr());
        }
    }
}

unsafe impl Send for Parker {}
unsafe impl Sync for Parker {}