summaryrefslogtreecommitdiffstats
path: root/xpcom/rust/moz_task/src/executor.rs
blob: 00168393735bf71c383463fc1ce868354e7b9918 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use crate::{get_current_thread, DispatchOptions, RunnableBuilder};
use std::{
    cell::Cell,
    fmt::Debug,
    future::Future,
    pin::Pin,
    ptr,
    sync::Arc,
    task::{Context, Poll},
};
use xpcom::interfaces::{nsIEventTarget, nsIRunnablePriority};
use xpcom::RefPtr;

/// A spawned task.
///
/// A [`AsyncTask`] can be awaited to retrieve the output of its future.
///
/// Dropping an [`AsyncTask`] cancels it, which means its future won't be polled
/// again. To drop the [`AsyncTask`] handle without canceling it, use
/// [`detach()`][`AsyncTask::detach()`] instead. To cancel a task gracefully and
/// wait until it is fully destroyed, use the [`cancel()`][AsyncTask::cancel()]
/// method.
///
/// A task which is cancelled due to the nsIEventTarget it was dispatched to no
/// longer accepting events will never be resolved.
#[derive(Debug)]
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
pub struct AsyncTask<T> {
    task: async_task::FallibleTask<T>,
}

impl<T> AsyncTask<T> {
    fn new(task: async_task::Task<T>) -> Self {
        AsyncTask {
            task: task.fallible(),
        }
    }

    /// Detaches the task to let it keep running in the background.
    pub fn detach(self) {
        self.task.detach()
    }

    /// Cancels the task and waits for it to stop running.
    ///
    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
    /// it didn't complete.
    ///
    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
    /// canceling because it also waits for the task to stop running.
    pub async fn cancel(self) -> Option<T> {
        self.task.cancel().await
    }
}

impl<T> Future for AsyncTask<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Wrap the future produced by `AsyncTask` to never resolve if the
        // Runnable was dropped, and the task was cancelled.
        match Pin::new(&mut self.task).poll(cx) {
            Poll::Ready(Some(t)) => Poll::Ready(t),
            Poll::Ready(None) | Poll::Pending => Poll::Pending,
        }
    }
}

enum SpawnTarget {
    BackgroundTask,
    EventTarget(RefPtr<nsIEventTarget>),
}

// SAFETY: All XPCOM interfaces are considered !Send + !Sync, however all
// well-behaved nsIEventTarget instances must be threadsafe.
unsafe impl Send for SpawnTarget {}
unsafe impl Sync for SpawnTarget {}

/// Information used by tasks as they are spawned. Stored in an Arc such that
/// their identity can be used for `POLLING_TASK`.
struct TaskSpawnConfig {
    name: &'static str,
    priority: u32,
    options: DispatchOptions,
    target: SpawnTarget,
}

thread_local! {
    /// Raw pointer to the TaskSpawnConfig for the currently polling task. Used
    /// to detect scheduling callbacks for a runnable while it is polled, to set
    /// `DISPATCH_AT_END` on the notification.
    static POLLING_TASK: Cell<*const TaskSpawnConfig> = Cell::new(ptr::null());
}

fn schedule(config: Arc<TaskSpawnConfig>, runnable: async_task::Runnable) {
    // If we're dispatching this task while it is currently running on the same
    // thread, set the `DISPATCH_AT_END` flag in the dispatch options to tell
    // our threadpool target to not bother to spin up another thread.
    let currently_polling = POLLING_TASK.with(|t| t.get() == Arc::as_ptr(&config));

    // SAFETY: We use the POLLING_TASK thread local to check if we meet the
    // requirements for `at_end`.
    let options = unsafe { config.options.at_end(currently_polling) };

    // Build the RunnableBuilder for our task to be dispatched.
    let config2 = config.clone();
    let builder = RunnableBuilder::new(config.name, move || {
        // Record the pointer for the currently executing task in the
        // POLLING_TASK thread-local so that nested dispatches can detect it.
        POLLING_TASK.with(|t| {
            let prev = t.get();
            t.set(Arc::as_ptr(&config2));
            runnable.run();
            t.set(prev);
        });
    })
    .priority(config.priority)
    .options(options);

    let rv = match &config.target {
        SpawnTarget::BackgroundTask => builder.dispatch_background_task(),
        SpawnTarget::EventTarget(target) => builder.dispatch(&*target),
    };
    if let Err(err) = rv {
        log::warn!(
            "dispatch for spawned task '{}' failed: {:?}",
            config.name,
            err
        );
    }
}

/// Helper for starting an async task which will run a future to completion.
#[derive(Debug)]
pub struct TaskBuilder<F> {
    name: &'static str,
    future: F,
    priority: u32,
    options: DispatchOptions,
}

impl<F> TaskBuilder<F> {
    pub fn new(name: &'static str, future: F) -> TaskBuilder<F> {
        TaskBuilder {
            name,
            future,
            priority: nsIRunnablePriority::PRIORITY_NORMAL,
            options: DispatchOptions::default(),
        }
    }

    /// Specify the priority of the task's runnables.
    pub fn priority(mut self, priority: u32) -> Self {
        self.priority = priority;
        self
    }

    /// Specify options to use when dispatching the task.
    pub fn options(mut self, options: DispatchOptions) -> Self {
        self.options = options;
        self
    }

    /// Set whether or not the event may block, and should be run on the IO
    /// thread pool.
    pub fn may_block(mut self, may_block: bool) -> Self {
        self.options = self.options.may_block(may_block);
        self
    }
}

impl<F> TaskBuilder<F>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    /// Run the future on the background task pool.
    pub fn spawn(self) -> AsyncTask<F::Output> {
        let config = Arc::new(TaskSpawnConfig {
            name: self.name,
            priority: self.priority,
            options: self.options,
            target: SpawnTarget::BackgroundTask,
        });
        let (runnable, task) = async_task::spawn(self.future, move |runnable| {
            schedule(config.clone(), runnable)
        });
        runnable.schedule();
        AsyncTask::new(task)
    }

    /// Run the future on the specified nsIEventTarget.
    pub fn spawn_onto(self, target: &nsIEventTarget) -> AsyncTask<F::Output> {
        let config = Arc::new(TaskSpawnConfig {
            name: self.name,
            priority: self.priority,
            options: self.options,
            target: SpawnTarget::EventTarget(RefPtr::new(target)),
        });
        let (runnable, task) = async_task::spawn(self.future, move |runnable| {
            schedule(config.clone(), runnable)
        });
        runnable.schedule();
        AsyncTask::new(task)
    }
}

impl<F> TaskBuilder<F>
where
    F: Future + 'static,
    F::Output: 'static,
{
    /// Run the future on the current thread.
    ///
    /// Unlike the other `spawn` methods, this method supports non-Send futures.
    ///
    /// # Panics
    ///
    /// This method may panic if run on a thread which cannot run local futures
    /// (e.g. due to it is not being an XPCOM thread, or if we are very late
    /// during shutdown).
    pub fn spawn_local(self) -> AsyncTask<F::Output> {
        let current_thread = get_current_thread().expect("cannot get current thread");
        let config = Arc::new(TaskSpawnConfig {
            name: self.name,
            priority: self.priority,
            options: self.options,
            target: SpawnTarget::EventTarget(RefPtr::new(current_thread.coerce())),
        });
        let (runnable, task) = async_task::spawn_local(self.future, move |runnable| {
            schedule(config.clone(), runnable)
        });
        runnable.schedule();
        AsyncTask::new(task)
    }
}

/// Spawn a future onto the background task pool. The future will not be run on
/// the main thread.
pub fn spawn<F>(name: &'static str, future: F) -> AsyncTask<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    TaskBuilder::new(name, future).spawn()
}

/// Spawn a potentially-blocking future onto the background task pool. The
/// future will not be run on the main thread.
pub fn spawn_blocking<F>(name: &'static str, future: F) -> AsyncTask<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    TaskBuilder::new(name, future).may_block(true).spawn()
}

/// Spawn a local future onto the current thread.
pub fn spawn_local<F>(name: &'static str, future: F) -> AsyncTask<F::Output>
where
    F: Future + 'static,
    F::Output: 'static,
{
    TaskBuilder::new(name, future).spawn_local()
}

pub fn spawn_onto<F>(name: &'static str, target: &nsIEventTarget, future: F) -> AsyncTask<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    TaskBuilder::new(name, future).spawn_onto(target)
}

pub fn spawn_onto_blocking<F>(
    name: &'static str,
    target: &nsIEventTarget,
    future: F,
) -> AsyncTask<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    TaskBuilder::new(name, future)
        .may_block(true)
        .spawn_onto(target)
}