diff options
Diffstat (limited to 'services/sync/golden_gate')
-rw-r--r-- | services/sync/golden_gate/Cargo.toml | 25 | ||||
-rw-r--r-- | services/sync/golden_gate/src/error.rs | 71 | ||||
-rw-r--r-- | services/sync/golden_gate/src/ferry.rs | 74 | ||||
-rw-r--r-- | services/sync/golden_gate/src/lib.rs | 119 | ||||
-rw-r--r-- | services/sync/golden_gate/src/log.rs | 161 | ||||
-rw-r--r-- | services/sync/golden_gate/src/task.rs | 355 |
6 files changed, 805 insertions, 0 deletions
diff --git a/services/sync/golden_gate/Cargo.toml b/services/sync/golden_gate/Cargo.toml new file mode 100644 index 0000000000..3f94e1a1e9 --- /dev/null +++ b/services/sync/golden_gate/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "golden_gate" +description = "A bridge for wiring up Sync engines implemented in Rust" +version = "0.1.0" +authors = ["The Firefox Sync Developers <sync-team@mozilla.com>"] +edition = "2018" +license = "MPL-2.0" + +[dependencies] +anyhow = "1" +atomic_refcell = "0.1" +cstr = "0.2" +interrupt-support = "0.1" +log = "0.4" +moz_task = { path = "../../../xpcom/rust/moz_task" } +nserror = { path = "../../../xpcom/rust/nserror" } +nsstring = { path = "../../../xpcom/rust/nsstring" } +serde_json = "1" +storage_variant = { path = "../../../storage/variant" } +sync15 = "0.1" +xpcom = { path = "../../../xpcom/rust/xpcom" } + +[dependencies.thin-vec] +version = "0.2.1" +features = ["gecko-ffi"] diff --git a/services/sync/golden_gate/src/error.rs b/services/sync/golden_gate/src/error.rs new file mode 100644 index 0000000000..373d20756e --- /dev/null +++ b/services/sync/golden_gate/src/error.rs @@ -0,0 +1,71 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::{error, fmt, result, str::Utf8Error}; + +use nserror::{nsresult, NS_ERROR_INVALID_ARG, NS_ERROR_UNEXPECTED}; +use serde_json::Error as JsonError; + +/// A specialized `Result` type for Golden Gate. +pub type Result<T> = result::Result<T, Error>; + +/// The error type for Golden Gate errors. +#[derive(Debug)] +pub enum Error { + /// A wrapped XPCOM error. + Nsresult(nsresult), + + /// A ferry didn't run on the background task queue. + DidNotRun(&'static str), + + /// A string contains invalid UTF-8 or JSON. + MalformedString(Box<dyn error::Error + Send + Sync + 'static>), +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + Error::MalformedString(error) => Some(error.as_ref()), + _ => None, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Nsresult(result) => write!(f, "Operation failed with {}", result.error_name()), + Error::DidNotRun(what) => write!(f, "Failed to run `{what}` on background thread"), + Error::MalformedString(error) => error.fmt(f), + } + } +} + +impl From<nsresult> for Error { + fn from(result: nsresult) -> Error { + Error::Nsresult(result) + } +} + +impl From<Utf8Error> for Error { + fn from(error: Utf8Error) -> Error { + Error::MalformedString(error.into()) + } +} + +impl From<JsonError> for Error { + fn from(error: JsonError) -> Error { + Error::MalformedString(error.into()) + } +} + +impl From<Error> for nsresult { + fn from(error: Error) -> nsresult { + match error { + Error::DidNotRun(_) => NS_ERROR_UNEXPECTED, + Error::Nsresult(result) => result, + Error::MalformedString(_) => NS_ERROR_INVALID_ARG, + } + } +} diff --git a/services/sync/golden_gate/src/ferry.rs b/services/sync/golden_gate/src/ferry.rs new file mode 100644 index 0000000000..99994811ab --- /dev/null +++ b/services/sync/golden_gate/src/ferry.rs @@ -0,0 +1,74 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use nsstring::nsCString; +use storage_variant::VariantType; +use sync15::Guid; +use xpcom::{interfaces::nsIVariant, RefPtr}; + +/// An operation that runs on the background thread, and optionally passes a +/// result to its callback. +pub enum Ferry { + LastSync, + SetLastSync(i64), + SyncId, + ResetSyncId, + EnsureCurrentSyncId(String), + SyncStarted, + StoreIncoming(Vec<nsCString>), + SetUploaded(i64, Vec<Guid>), + SyncFinished, + Reset, + Wipe, +} + +impl Ferry { + /// Returns the operation name for debugging and labeling the task + /// runnable. + pub fn name(&self) -> &'static str { + match self { + Ferry::LastSync => concat!(module_path!(), "getLastSync"), + Ferry::SetLastSync(_) => concat!(module_path!(), "setLastSync"), + Ferry::SyncId => concat!(module_path!(), "getSyncId"), + Ferry::ResetSyncId => concat!(module_path!(), "resetSyncId"), + Ferry::EnsureCurrentSyncId(_) => concat!(module_path!(), "ensureCurrentSyncId"), + Ferry::SyncStarted => concat!(module_path!(), "syncStarted"), + Ferry::StoreIncoming { .. } => concat!(module_path!(), "storeIncoming"), + Ferry::SetUploaded { .. } => concat!(module_path!(), "setUploaded"), + Ferry::SyncFinished => concat!(module_path!(), "syncFinished"), + Ferry::Reset => concat!(module_path!(), "reset"), + Ferry::Wipe => concat!(module_path!(), "wipe"), + } + } +} + +/// The result of a ferry task, sent from the background thread back to the +/// main thread. Results are converted to variants, and passed as arguments to +/// `mozIBridgedSyncEngineCallback`s. +pub enum FerryResult { + LastSync(i64), + SyncId(Option<String>), + AssignedSyncId(String), + Null, +} + +impl Default for FerryResult { + fn default() -> Self { + FerryResult::Null + } +} + +impl FerryResult { + /// Converts the result to an `nsIVariant` that can be passed as an + /// argument to `callback.handleResult()`. + pub fn into_variant(self) -> RefPtr<nsIVariant> { + match self { + FerryResult::LastSync(v) => v.into_variant(), + FerryResult::SyncId(Some(v)) => nsCString::from(v).into_variant(), + FerryResult::SyncId(None) => ().into_variant(), + FerryResult::AssignedSyncId(v) => nsCString::from(v).into_variant(), + FerryResult::Null => ().into_variant(), + } + } +} diff --git a/services/sync/golden_gate/src/lib.rs b/services/sync/golden_gate/src/lib.rs new file mode 100644 index 0000000000..8da6524bd7 --- /dev/null +++ b/services/sync/golden_gate/src/lib.rs @@ -0,0 +1,119 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! **Golden Gate** 🌉 is a crate for bridging Desktop Sync to our suite of +//! Rust sync and storage components. It connects Sync's `BridgedEngine` class +//! to the Rust `BridgedEngine` trait via the `mozIBridgedSyncEngine` XPCOM +//! interface. +//! +//! Due to limitations in implementing XPCOM interfaces for generic types, +//! Golden Gate doesn't implement `mozIBridgedSyncEngine` directly. Instead, +//! it provides helpers, called "ferries", for passing Sync records between +//! JavaScript and Rust. The ferries also handle threading and type +//! conversions. +//! +//! Here's a step-by-step guide for adding a new Rust Sync engine to Firefox. +//! +//! ## Step 1: Create your (XPCOM) bridge +//! +//! In your consuming crate, define a type for your `mozIBridgedSyncEngine` +//! implementation. We'll call this type the **brige**. The bridge is +//! responsible for exposing your Sync engine to XPIDL [^1], in a way that lets +//! JavaScript call it. +//! +//! For your bridge type, you'll need to implement an xpcom interface with the +//! `#[xpcom(implement(mozIBridgedSyncEngine), nonatomic)]` attribute then +//! define `xpcom_method!()` stubs for the `mozIBridgedSyncEngine` methods. For +//! more details about implementing XPCOM methods in Rust, check out the docs in +//! `xpcom/rust/xpcom/src/method.rs`. +//! +//! You'll also need to add an entry for your bridge type to `components.conf`, +//! and define C++ and Rust constructors for it, so that JavaScript code can +//! create instances of it. Check out `NS_NewWebExtStorage` (and, in C++, +//! `mozilla::extensions::storageapi::NewWebExtStorage`) and +//! `NS_NewSyncedBookmarksMerger` (`mozilla::places::NewSyncedBookmarksMerger` +//! in C++) for how to do this. +//! +//! [^1]: You can think of XPIDL as a souped-up C FFI, with richer types and a +//! degree of type safety. +//! +//! ## Step 2: Add a background task queue to your bridge +//! +//! A task queue lets your engine do I/O, merging, and other syncing tasks on a +//! background thread pool. This is important because database reads and writes +//! can take an unpredictable amount of time. Doing these on the main thread can +//! cause jank, and, in the worst case, lock up the browser UI for seconds at a +//! time. +//! +//! The `moz_task` crate provides a `create_background_task_queue` function to +//! do this. Once you have a queue, you can use it to call into your Rust +//! engine. Golden Gate takes care of ferrying arguments back and forth across +//! the thread boundary. +//! +//! Since it's a queue, ferries arrive in the order they're scheduled, so +//! your engine's `store_incoming` method will always be called before `apply`, +//! which is likewise called before `set_uploaded`. The thread manager scales +//! the pool for you; you don't need to create or manage your own threads. +//! +//! ## Step 3: Create your Rust engine +//! +//! Next, you'll need to implement the Rust side of the bridge. This is a type +//! that implements the `BridgedEngine` trait. +//! +//! Bridged engines handle storing incoming Sync records, merging changes, +//! resolving conflicts, and fetching outgoing records for upload. Under the +//! hood, your engine will hold either a database connection directly, or +//! another object that does. +//! +//! Although outside the scope of Golden Gate, your engine will also likely +//! expose a data storage API, for fetching, updating, and deleting items +//! locally. Golden Gate provides the syncing layer on top of this local store. +//! +//! A `BridgedEngine` itself doesn't need to be `Send` or `Sync`, but the +//! ferries require both, since they're calling into your bridge on the +//! background task queue. +//! +//! In practice, this means your bridge will need to hold a thread-safe owned +//! reference to the engine, via `Arc<Mutex<BridgedEngine>>`. In fact, this +//! pattern is so common that Golden Gate implements `BridgedEngine` for any +//! `Mutex<BridgedEngine>`, which automatically locks the mutex before calling +//! into the engine. +//! +//! ## Step 4: Connect the bridge to the JavaScript and Rust sides +//! +//! On the JavaScript side, you'll need to subclass Sync's `BridgedEngine` +//! class, and give it a handle to your XPCOM bridge. The base class has all the +//! machinery for hooking up any `mozIBridgedSyncEngine` implementation so that +//! Sync can drive it. +//! +//! On the Rust side, each `mozIBridgedSyncEngine` method should create a +//! Golden Gate ferry, and dispatch it to the background task queue. The +//! ferries correspond to the method names. For example, `ensureCurrentSyncId` +//! should create a `Ferry::ensure_current_sync_id(...)`; `storeIncoming`, a +//! `Ferry::store_incoming(...)`; and so on. This is mostly boilerplate. +//! +//! And that's it! Each ferry will, in turn, call into your Rust +//! `BridgedEngine`, and send the results back to JavaScript. +//! +//! For an example of how all this works, including exposing a storage (not +//! just syncing!) API to JS via XPIDL, check out `webext_storage::Bridge` for +//! the `storage.sync` API! + +#[macro_use] +extern crate cstr; + +pub mod error; +mod ferry; +pub mod log; +pub mod task; + +pub use crate::log::LogSink; +pub use error::{Error, Result}; +// Re-export items from `interrupt-support` and `sync15`, so that +// consumers of `golden_gate` don't have to depend on them. +pub use interrupt_support::{Interrupted, Interruptee}; +pub use sync15::bso::{IncomingBso, OutgoingBso}; +pub use sync15::engine::{ApplyResults, BridgedEngine}; +pub use sync15::Guid; +pub use task::{ApplyTask, FerryTask}; diff --git a/services/sync/golden_gate/src/log.rs b/services/sync/golden_gate/src/log.rs new file mode 100644 index 0000000000..de7fd0dfc3 --- /dev/null +++ b/services/sync/golden_gate/src/log.rs @@ -0,0 +1,161 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::fmt::{self, Write}; + +use log::{Level, LevelFilter, Log, Metadata, Record}; +use moz_task::{Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; +use nserror::nsresult; +use nsstring::nsString; +use xpcom::{interfaces::mozIServicesLogSink, RefPtr}; + +pub struct LogSink { + pub max_level: LevelFilter, + logger: Option<ThreadPtrHandle<mozIServicesLogSink>>, +} + +impl Default for LogSink { + fn default() -> Self { + LogSink { + max_level: LevelFilter::Off, + logger: None, + } + } +} + +impl LogSink { + /// Creates a log sink that adapts the Rust `log` crate to the Sync + /// `Log.sys.mjs` logger. + /// + /// This is copied from `bookmark_sync::Logger`. It would be nice to share + /// these, but, for now, we've just duplicated it to make prototyping + /// easier. + #[inline] + pub fn new(max_level: LevelFilter, logger: ThreadPtrHandle<mozIServicesLogSink>) -> LogSink { + LogSink { + max_level, + logger: Some(logger), + } + } + + /// Creates a log sink using the given Services `logger` as the + /// underlying implementation. The `logger` will always be called + /// asynchronously on its owning thread; it doesn't need to be + /// thread-safe. + pub fn with_logger(logger: Option<&mozIServicesLogSink>) -> Result<LogSink, nsresult> { + Ok(if let Some(logger) = logger { + // Fetch the maximum log level while we're on the main thread, so + // that `LogSink::enabled()` can check it while on the background + // thread. Otherwise, we'd need to dispatch a `LogTask` for every + // log message, only to discard most of them when the task calls + // into the logger on the main thread. + let mut raw_max_level = 0i16; + let rv = unsafe { logger.GetMaxLevel(&mut raw_max_level) }; + let max_level = if rv.succeeded() { + match raw_max_level { + mozIServicesLogSink::LEVEL_ERROR => LevelFilter::Error, + mozIServicesLogSink::LEVEL_WARN => LevelFilter::Warn, + mozIServicesLogSink::LEVEL_DEBUG => LevelFilter::Debug, + mozIServicesLogSink::LEVEL_TRACE => LevelFilter::Trace, + mozIServicesLogSink::LEVEL_INFO => LevelFilter::Info, + _ => LevelFilter::Off, + } + } else { + LevelFilter::Off + }; + LogSink::new( + max_level, + ThreadPtrHolder::new(cstr!("mozIServicesLogSink"), RefPtr::new(logger))?, + ) + } else { + LogSink::default() + }) + } + + /// Returns a reference to the underlying `mozIServicesLogSink`. + pub fn logger(&self) -> Option<&mozIServicesLogSink> { + self.logger.as_ref().and_then(|l| l.get()) + } + + /// Logs a message to the Sync logger, if one is set. This would be better + /// implemented as a macro, as Dogear does, so that we can pass variadic + /// arguments without manually invoking `fmt_args!()` every time we want + /// to log a message. + /// + /// The `log` crate's macros aren't suitable here, because those log to the + /// global logger. However, we don't want to set the global logger in our + /// crate, because that will log _everything_ that uses the Rust `log` crate + /// to the Sync logs, including WebRender and audio logging. + pub fn debug(&self, args: fmt::Arguments) { + let meta = Metadata::builder() + .level(Level::Debug) + .target(module_path!()) + .build(); + if self.enabled(&meta) { + self.log(&Record::builder().args(args).metadata(meta).build()); + } + } +} + +impl Log for LogSink { + #[inline] + fn enabled(&self, meta: &Metadata) -> bool { + self.logger.is_some() && meta.level() <= self.max_level + } + + fn log(&self, record: &Record) { + if !self.enabled(record.metadata()) { + return; + } + if let Some(logger) = &self.logger { + let mut message = nsString::new(); + if write!(message, "{}", record.args()).is_ok() { + let task = LogTask { + logger: logger.clone(), + level: record.metadata().level(), + message, + }; + let _ = TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task)) + .and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread())); + } + } + } + + fn flush(&self) {} +} + +/// Logs a message to the mirror logger. This task is created on the background +/// thread queue, and dispatched to the main thread. +struct LogTask { + logger: ThreadPtrHandle<mozIServicesLogSink>, + level: Level, + message: nsString, +} + +impl Task for LogTask { + fn run(&self) { + let logger = self.logger.get().unwrap(); + match self.level { + Level::Error => unsafe { + logger.Error(&*self.message); + }, + Level::Warn => unsafe { + logger.Warn(&*self.message); + }, + Level::Debug => unsafe { + logger.Debug(&*self.message); + }, + Level::Trace => unsafe { + logger.Trace(&*self.message); + }, + Level::Info => unsafe { + logger.Info(&*self.message); + }, + } + } + + fn done(&self) -> Result<(), nsresult> { + Ok(()) + } +} diff --git a/services/sync/golden_gate/src/task.rs b/services/sync/golden_gate/src/task.rs new file mode 100644 index 0000000000..8cab21830b --- /dev/null +++ b/services/sync/golden_gate/src/task.rs @@ -0,0 +1,355 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::{fmt::Write, mem, result}; + +use atomic_refcell::AtomicRefCell; +use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; +use nserror::{nsresult, NS_ERROR_FAILURE}; +use nsstring::{nsACString, nsCString}; +use sync15::engine::{ApplyResults, BridgedEngine}; +use sync15::Guid; +use thin_vec::ThinVec; +use xpcom::{ + interfaces::{ + mozIBridgedSyncEngineApplyCallback, mozIBridgedSyncEngineCallback, nsIEventTarget, + }, + RefPtr, +}; + +use crate::error::{Error, Result}; +use crate::ferry::{Ferry, FerryResult}; + +/// A ferry task sends (or ferries) an operation to a bridged engine on a +/// background thread or task queue, and ferries back an optional result to +/// a callback. +pub struct FerryTask { + /// We want to ensure scheduled ferries can't block finalization of the underlying + /// store - we want a degree of confidence that closing the database will happen when + /// we want even if tasks are queued up to run on another thread. + /// We rely on the semantics of our BridgedEngines to help here: + /// * A bridged engine is expected to hold a weak reference to its store. + /// * Our LazyStore is the only thing holding a reference to the "real" store. + /// Thus, when our LazyStore asks our "real" store to close, we can be confident + /// a close will happen (ie, we assume that the real store will be able to unwrapp + /// the underlying sqlite `Connection` (using `Arc::try_unwrap`) and close it. + /// However, note that if an operation on the bridged engine is currently running, + /// we will block waiting for that operation to complete, so while this isn't + /// guaranteed to happen immediately, it should happen "soon enough". + engine: Box<dyn BridgedEngine>, + ferry: Ferry, + callback: ThreadPtrHandle<mozIBridgedSyncEngineCallback>, + result: AtomicRefCell<anyhow::Result<FerryResult>>, +} + +impl FerryTask { + /// Creates a task to fetch the engine's last sync time, in milliseconds. + #[inline] + pub fn for_last_sync( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::LastSync, callback) + } + + /// Creates a task to set the engine's last sync time, in milliseconds. + #[inline] + pub fn for_set_last_sync( + engine: Box<dyn BridgedEngine>, + last_sync_millis: i64, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback) + } + + /// Creates a task to fetch the engine's sync ID. + #[inline] + pub fn for_sync_id( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SyncId, callback) + } + + /// Creates a task to reset the engine's sync ID and all its local Sync + /// metadata. + #[inline] + pub fn for_reset_sync_id( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::ResetSyncId, callback) + } + + /// Creates a task to compare the bridged engine's local sync ID with + /// the `new_sync_id` from `meta/global`, and ferry back the final sync ID + /// to use. + #[inline] + pub fn for_ensure_current_sync_id( + engine: Box<dyn BridgedEngine>, + new_sync_id: &nsACString, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry( + engine, + Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()), + callback, + ) + } + + /// Creates a task to signal that the engine is about to sync. + #[inline] + pub fn for_sync_started( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SyncStarted, callback) + } + + /// Creates a task to store incoming records. + pub fn for_store_incoming( + engine: Box<dyn BridgedEngine>, + incoming_envelopes_json: &[nsCString], + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry( + engine, + Ferry::StoreIncoming(incoming_envelopes_json.to_vec()), + callback, + ) + } + + /// Creates a task to mark a subset of outgoing records as uploaded. This + /// may be called multiple times per sync, or not at all if there are no + /// records to upload. + pub fn for_set_uploaded( + engine: Box<dyn BridgedEngine>, + server_modified_millis: i64, + uploaded_ids: &[nsCString], + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + let uploaded_ids = uploaded_ids.iter().map(|id| Guid::from_slice(id)).collect(); + Self::with_ferry( + engine, + Ferry::SetUploaded(server_modified_millis, uploaded_ids), + callback, + ) + } + + /// Creates a task to signal that all records have been uploaded, and + /// the engine has been synced. This is called even if there were no + /// records uploaded. + #[inline] + pub fn for_sync_finished( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SyncFinished, callback) + } + + /// Creates a task to reset all local Sync state for the engine, without + /// erasing user data. + #[inline] + pub fn for_reset( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::Reset, callback) + } + + /// Creates a task to erase all local user data for the engine. + #[inline] + pub fn for_wipe( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::Wipe, callback) + } + + /// Creates a task for a ferry. The `callback` is bound to the current + /// thread, and will be called once, after the ferry returns from the + /// background thread. + fn with_ferry( + engine: Box<dyn BridgedEngine>, + ferry: Ferry, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + let name = ferry.name(); + Ok(FerryTask { + engine, + ferry, + callback: ThreadPtrHolder::new( + cstr!("mozIBridgedSyncEngineCallback"), + RefPtr::new(callback), + )?, + result: AtomicRefCell::new(Err(Error::DidNotRun(name).into())), + }) + } + + /// Dispatches the task to the given thread `target`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> { + let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?; + // `may_block` schedules the task on the I/O thread pool, since we + // expect most operations to wait on I/O. + TaskRunnable::dispatch_with_options( + runnable, + target, + DispatchOptions::default().may_block(true), + )?; + Ok(()) + } + + /// Runs the task on the background thread. This is split out into its own + /// method to make error handling easier. + fn inner_run(&self) -> anyhow::Result<FerryResult> { + let engine = &self.engine; + Ok(match &self.ferry { + Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?), + Ferry::SetLastSync(last_sync_millis) => { + engine.set_last_sync(*last_sync_millis)?; + FerryResult::default() + } + Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?), + Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?), + Ferry::EnsureCurrentSyncId(new_sync_id) => { + FerryResult::AssignedSyncId(engine.ensure_current_sync_id(new_sync_id)?) + } + Ferry::SyncStarted => { + engine.sync_started()?; + FerryResult::default() + } + Ferry::StoreIncoming(incoming_envelopes_json) => { + let incoming_envelopes = incoming_envelopes_json + .iter() + .map(|envelope| Ok(serde_json::from_slice(envelope)?)) + .collect::<Result<_>>()?; + + engine.store_incoming(incoming_envelopes)?; + FerryResult::default() + } + Ferry::SetUploaded(server_modified_millis, uploaded_ids) => { + engine.set_uploaded(*server_modified_millis, uploaded_ids.as_slice())?; + FerryResult::default() + } + Ferry::SyncFinished => { + engine.sync_finished()?; + FerryResult::default() + } + Ferry::Reset => { + engine.reset()?; + FerryResult::default() + } + Ferry::Wipe => { + engine.wipe()?; + FerryResult::default() + } + }) + } +} + +impl Task for FerryTask { + fn run(&self) { + *self.result.borrow_mut() = self.inner_run(); + } + + fn done(&self) -> result::Result<(), nsresult> { + let callback = self.callback.get().unwrap(); + match mem::replace( + &mut *self.result.borrow_mut(), + Err(Error::DidNotRun(self.ferry.name()).into()), + ) { + Ok(result) => unsafe { callback.HandleSuccess(result.into_variant().coerce()) }, + Err(err) => { + let mut message = nsCString::new(); + write!(message, "{err}").unwrap(); + unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) } + } + } + .to_result() + } +} + +/// An apply task ferries incoming records to an engine on a background +/// thread, and ferries back records to upload. It's separate from +/// `FerryTask` because its callback type is different. +pub struct ApplyTask { + engine: Box<dyn BridgedEngine>, + callback: ThreadPtrHandle<mozIBridgedSyncEngineApplyCallback>, + result: AtomicRefCell<anyhow::Result<Vec<String>>>, +} + +impl ApplyTask { + /// Returns the task name for debugging. + pub fn name() -> &'static str { + concat!(module_path!(), "apply") + } + + /// Runs the task on the background thread. + fn inner_run(&self) -> anyhow::Result<Vec<String>> { + let ApplyResults { + records: outgoing_records, + .. + } = self.engine.apply()?; + let outgoing_records_json = outgoing_records + .iter() + .map(|record| Ok(serde_json::to_string(record)?)) + .collect::<Result<_>>()?; + Ok(outgoing_records_json) + } + + /// Creates a task. The `callback` is bound to the current thread, and will + /// be called once, after the records are applied on the background thread. + pub fn new( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineApplyCallback, + ) -> Result<ApplyTask> { + Ok(ApplyTask { + engine, + callback: ThreadPtrHolder::new( + cstr!("mozIBridgedSyncEngineApplyCallback"), + RefPtr::new(callback), + )?, + result: AtomicRefCell::new(Err(Error::DidNotRun(Self::name()).into())), + }) + } + + /// Dispatches the task to the given thread `target`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> { + let runnable = TaskRunnable::new(Self::name(), Box::new(self))?; + TaskRunnable::dispatch_with_options( + runnable, + target, + DispatchOptions::default().may_block(true), + )?; + Ok(()) + } +} + +impl Task for ApplyTask { + fn run(&self) { + *self.result.borrow_mut() = self.inner_run(); + } + + fn done(&self) -> result::Result<(), nsresult> { + let callback = self.callback.get().unwrap(); + match mem::replace( + &mut *self.result.borrow_mut(), + Err(Error::DidNotRun(Self::name()).into()), + ) { + Ok(envelopes) => { + let result = envelopes + .into_iter() + .map(nsCString::from) + .collect::<ThinVec<_>>(); + unsafe { callback.HandleSuccess(&result) } + } + Err(err) => { + let mut message = nsCString::new(); + write!(message, "{err}").unwrap(); + unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) } + } + } + .to_result() + } +} |