summaryrefslogtreecommitdiffstats
path: root/services/sync/golden_gate
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /services/sync/golden_gate
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'services/sync/golden_gate')
-rw-r--r--services/sync/golden_gate/Cargo.toml25
-rw-r--r--services/sync/golden_gate/src/error.rs71
-rw-r--r--services/sync/golden_gate/src/ferry.rs74
-rw-r--r--services/sync/golden_gate/src/lib.rs119
-rw-r--r--services/sync/golden_gate/src/log.rs161
-rw-r--r--services/sync/golden_gate/src/task.rs355
6 files changed, 805 insertions, 0 deletions
diff --git a/services/sync/golden_gate/Cargo.toml b/services/sync/golden_gate/Cargo.toml
new file mode 100644
index 0000000000..3f94e1a1e9
--- /dev/null
+++ b/services/sync/golden_gate/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+name = "golden_gate"
+description = "A bridge for wiring up Sync engines implemented in Rust"
+version = "0.1.0"
+authors = ["The Firefox Sync Developers <sync-team@mozilla.com>"]
+edition = "2018"
+license = "MPL-2.0"
+
+[dependencies]
+anyhow = "1"
+atomic_refcell = "0.1"
+cstr = "0.2"
+interrupt-support = "0.1"
+log = "0.4"
+moz_task = { path = "../../../xpcom/rust/moz_task" }
+nserror = { path = "../../../xpcom/rust/nserror" }
+nsstring = { path = "../../../xpcom/rust/nsstring" }
+serde_json = "1"
+storage_variant = { path = "../../../storage/variant" }
+sync15 = "0.1"
+xpcom = { path = "../../../xpcom/rust/xpcom" }
+
+[dependencies.thin-vec]
+version = "0.2.1"
+features = ["gecko-ffi"]
diff --git a/services/sync/golden_gate/src/error.rs b/services/sync/golden_gate/src/error.rs
new file mode 100644
index 0000000000..373d20756e
--- /dev/null
+++ b/services/sync/golden_gate/src/error.rs
@@ -0,0 +1,71 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::{error, fmt, result, str::Utf8Error};
+
+use nserror::{nsresult, NS_ERROR_INVALID_ARG, NS_ERROR_UNEXPECTED};
+use serde_json::Error as JsonError;
+
+/// A specialized `Result` type for Golden Gate.
+pub type Result<T> = result::Result<T, Error>;
+
+/// The error type for Golden Gate errors.
+#[derive(Debug)]
+pub enum Error {
+ /// A wrapped XPCOM error.
+ Nsresult(nsresult),
+
+ /// A ferry didn't run on the background task queue.
+ DidNotRun(&'static str),
+
+ /// A string contains invalid UTF-8 or JSON.
+ MalformedString(Box<dyn error::Error + Send + Sync + 'static>),
+}
+
+impl error::Error for Error {
+ fn source(&self) -> Option<&(dyn error::Error + 'static)> {
+ match self {
+ Error::MalformedString(error) => Some(error.as_ref()),
+ _ => None,
+ }
+ }
+}
+
+impl fmt::Display for Error {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ Error::Nsresult(result) => write!(f, "Operation failed with {}", result.error_name()),
+ Error::DidNotRun(what) => write!(f, "Failed to run `{what}` on background thread"),
+ Error::MalformedString(error) => error.fmt(f),
+ }
+ }
+}
+
+impl From<nsresult> for Error {
+ fn from(result: nsresult) -> Error {
+ Error::Nsresult(result)
+ }
+}
+
+impl From<Utf8Error> for Error {
+ fn from(error: Utf8Error) -> Error {
+ Error::MalformedString(error.into())
+ }
+}
+
+impl From<JsonError> for Error {
+ fn from(error: JsonError) -> Error {
+ Error::MalformedString(error.into())
+ }
+}
+
+impl From<Error> for nsresult {
+ fn from(error: Error) -> nsresult {
+ match error {
+ Error::DidNotRun(_) => NS_ERROR_UNEXPECTED,
+ Error::Nsresult(result) => result,
+ Error::MalformedString(_) => NS_ERROR_INVALID_ARG,
+ }
+ }
+}
diff --git a/services/sync/golden_gate/src/ferry.rs b/services/sync/golden_gate/src/ferry.rs
new file mode 100644
index 0000000000..99994811ab
--- /dev/null
+++ b/services/sync/golden_gate/src/ferry.rs
@@ -0,0 +1,74 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use nsstring::nsCString;
+use storage_variant::VariantType;
+use sync15::Guid;
+use xpcom::{interfaces::nsIVariant, RefPtr};
+
+/// An operation that runs on the background thread, and optionally passes a
+/// result to its callback.
+pub enum Ferry {
+ LastSync,
+ SetLastSync(i64),
+ SyncId,
+ ResetSyncId,
+ EnsureCurrentSyncId(String),
+ SyncStarted,
+ StoreIncoming(Vec<nsCString>),
+ SetUploaded(i64, Vec<Guid>),
+ SyncFinished,
+ Reset,
+ Wipe,
+}
+
+impl Ferry {
+ /// Returns the operation name for debugging and labeling the task
+ /// runnable.
+ pub fn name(&self) -> &'static str {
+ match self {
+ Ferry::LastSync => concat!(module_path!(), "getLastSync"),
+ Ferry::SetLastSync(_) => concat!(module_path!(), "setLastSync"),
+ Ferry::SyncId => concat!(module_path!(), "getSyncId"),
+ Ferry::ResetSyncId => concat!(module_path!(), "resetSyncId"),
+ Ferry::EnsureCurrentSyncId(_) => concat!(module_path!(), "ensureCurrentSyncId"),
+ Ferry::SyncStarted => concat!(module_path!(), "syncStarted"),
+ Ferry::StoreIncoming { .. } => concat!(module_path!(), "storeIncoming"),
+ Ferry::SetUploaded { .. } => concat!(module_path!(), "setUploaded"),
+ Ferry::SyncFinished => concat!(module_path!(), "syncFinished"),
+ Ferry::Reset => concat!(module_path!(), "reset"),
+ Ferry::Wipe => concat!(module_path!(), "wipe"),
+ }
+ }
+}
+
+/// The result of a ferry task, sent from the background thread back to the
+/// main thread. Results are converted to variants, and passed as arguments to
+/// `mozIBridgedSyncEngineCallback`s.
+pub enum FerryResult {
+ LastSync(i64),
+ SyncId(Option<String>),
+ AssignedSyncId(String),
+ Null,
+}
+
+impl Default for FerryResult {
+ fn default() -> Self {
+ FerryResult::Null
+ }
+}
+
+impl FerryResult {
+ /// Converts the result to an `nsIVariant` that can be passed as an
+ /// argument to `callback.handleResult()`.
+ pub fn into_variant(self) -> RefPtr<nsIVariant> {
+ match self {
+ FerryResult::LastSync(v) => v.into_variant(),
+ FerryResult::SyncId(Some(v)) => nsCString::from(v).into_variant(),
+ FerryResult::SyncId(None) => ().into_variant(),
+ FerryResult::AssignedSyncId(v) => nsCString::from(v).into_variant(),
+ FerryResult::Null => ().into_variant(),
+ }
+ }
+}
diff --git a/services/sync/golden_gate/src/lib.rs b/services/sync/golden_gate/src/lib.rs
new file mode 100644
index 0000000000..8da6524bd7
--- /dev/null
+++ b/services/sync/golden_gate/src/lib.rs
@@ -0,0 +1,119 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! **Golden Gate** 🌉 is a crate for bridging Desktop Sync to our suite of
+//! Rust sync and storage components. It connects Sync's `BridgedEngine` class
+//! to the Rust `BridgedEngine` trait via the `mozIBridgedSyncEngine` XPCOM
+//! interface.
+//!
+//! Due to limitations in implementing XPCOM interfaces for generic types,
+//! Golden Gate doesn't implement `mozIBridgedSyncEngine` directly. Instead,
+//! it provides helpers, called "ferries", for passing Sync records between
+//! JavaScript and Rust. The ferries also handle threading and type
+//! conversions.
+//!
+//! Here's a step-by-step guide for adding a new Rust Sync engine to Firefox.
+//!
+//! ## Step 1: Create your (XPCOM) bridge
+//!
+//! In your consuming crate, define a type for your `mozIBridgedSyncEngine`
+//! implementation. We'll call this type the **brige**. The bridge is
+//! responsible for exposing your Sync engine to XPIDL [^1], in a way that lets
+//! JavaScript call it.
+//!
+//! For your bridge type, you'll need to implement an xpcom interface with the
+//! `#[xpcom(implement(mozIBridgedSyncEngine), nonatomic)]` attribute then
+//! define `xpcom_method!()` stubs for the `mozIBridgedSyncEngine` methods. For
+//! more details about implementing XPCOM methods in Rust, check out the docs in
+//! `xpcom/rust/xpcom/src/method.rs`.
+//!
+//! You'll also need to add an entry for your bridge type to `components.conf`,
+//! and define C++ and Rust constructors for it, so that JavaScript code can
+//! create instances of it. Check out `NS_NewWebExtStorage` (and, in C++,
+//! `mozilla::extensions::storageapi::NewWebExtStorage`) and
+//! `NS_NewSyncedBookmarksMerger` (`mozilla::places::NewSyncedBookmarksMerger`
+//! in C++) for how to do this.
+//!
+//! [^1]: You can think of XPIDL as a souped-up C FFI, with richer types and a
+//! degree of type safety.
+//!
+//! ## Step 2: Add a background task queue to your bridge
+//!
+//! A task queue lets your engine do I/O, merging, and other syncing tasks on a
+//! background thread pool. This is important because database reads and writes
+//! can take an unpredictable amount of time. Doing these on the main thread can
+//! cause jank, and, in the worst case, lock up the browser UI for seconds at a
+//! time.
+//!
+//! The `moz_task` crate provides a `create_background_task_queue` function to
+//! do this. Once you have a queue, you can use it to call into your Rust
+//! engine. Golden Gate takes care of ferrying arguments back and forth across
+//! the thread boundary.
+//!
+//! Since it's a queue, ferries arrive in the order they're scheduled, so
+//! your engine's `store_incoming` method will always be called before `apply`,
+//! which is likewise called before `set_uploaded`. The thread manager scales
+//! the pool for you; you don't need to create or manage your own threads.
+//!
+//! ## Step 3: Create your Rust engine
+//!
+//! Next, you'll need to implement the Rust side of the bridge. This is a type
+//! that implements the `BridgedEngine` trait.
+//!
+//! Bridged engines handle storing incoming Sync records, merging changes,
+//! resolving conflicts, and fetching outgoing records for upload. Under the
+//! hood, your engine will hold either a database connection directly, or
+//! another object that does.
+//!
+//! Although outside the scope of Golden Gate, your engine will also likely
+//! expose a data storage API, for fetching, updating, and deleting items
+//! locally. Golden Gate provides the syncing layer on top of this local store.
+//!
+//! A `BridgedEngine` itself doesn't need to be `Send` or `Sync`, but the
+//! ferries require both, since they're calling into your bridge on the
+//! background task queue.
+//!
+//! In practice, this means your bridge will need to hold a thread-safe owned
+//! reference to the engine, via `Arc<Mutex<BridgedEngine>>`. In fact, this
+//! pattern is so common that Golden Gate implements `BridgedEngine` for any
+//! `Mutex<BridgedEngine>`, which automatically locks the mutex before calling
+//! into the engine.
+//!
+//! ## Step 4: Connect the bridge to the JavaScript and Rust sides
+//!
+//! On the JavaScript side, you'll need to subclass Sync's `BridgedEngine`
+//! class, and give it a handle to your XPCOM bridge. The base class has all the
+//! machinery for hooking up any `mozIBridgedSyncEngine` implementation so that
+//! Sync can drive it.
+//!
+//! On the Rust side, each `mozIBridgedSyncEngine` method should create a
+//! Golden Gate ferry, and dispatch it to the background task queue. The
+//! ferries correspond to the method names. For example, `ensureCurrentSyncId`
+//! should create a `Ferry::ensure_current_sync_id(...)`; `storeIncoming`, a
+//! `Ferry::store_incoming(...)`; and so on. This is mostly boilerplate.
+//!
+//! And that's it! Each ferry will, in turn, call into your Rust
+//! `BridgedEngine`, and send the results back to JavaScript.
+//!
+//! For an example of how all this works, including exposing a storage (not
+//! just syncing!) API to JS via XPIDL, check out `webext_storage::Bridge` for
+//! the `storage.sync` API!
+
+#[macro_use]
+extern crate cstr;
+
+pub mod error;
+mod ferry;
+pub mod log;
+pub mod task;
+
+pub use crate::log::LogSink;
+pub use error::{Error, Result};
+// Re-export items from `interrupt-support` and `sync15`, so that
+// consumers of `golden_gate` don't have to depend on them.
+pub use interrupt_support::{Interrupted, Interruptee};
+pub use sync15::bso::{IncomingBso, OutgoingBso};
+pub use sync15::engine::{ApplyResults, BridgedEngine};
+pub use sync15::Guid;
+pub use task::{ApplyTask, FerryTask};
diff --git a/services/sync/golden_gate/src/log.rs b/services/sync/golden_gate/src/log.rs
new file mode 100644
index 0000000000..de7fd0dfc3
--- /dev/null
+++ b/services/sync/golden_gate/src/log.rs
@@ -0,0 +1,161 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::fmt::{self, Write};
+
+use log::{Level, LevelFilter, Log, Metadata, Record};
+use moz_task::{Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder};
+use nserror::nsresult;
+use nsstring::nsString;
+use xpcom::{interfaces::mozIServicesLogSink, RefPtr};
+
+pub struct LogSink {
+ pub max_level: LevelFilter,
+ logger: Option<ThreadPtrHandle<mozIServicesLogSink>>,
+}
+
+impl Default for LogSink {
+ fn default() -> Self {
+ LogSink {
+ max_level: LevelFilter::Off,
+ logger: None,
+ }
+ }
+}
+
+impl LogSink {
+ /// Creates a log sink that adapts the Rust `log` crate to the Sync
+ /// `Log.sys.mjs` logger.
+ ///
+ /// This is copied from `bookmark_sync::Logger`. It would be nice to share
+ /// these, but, for now, we've just duplicated it to make prototyping
+ /// easier.
+ #[inline]
+ pub fn new(max_level: LevelFilter, logger: ThreadPtrHandle<mozIServicesLogSink>) -> LogSink {
+ LogSink {
+ max_level,
+ logger: Some(logger),
+ }
+ }
+
+ /// Creates a log sink using the given Services `logger` as the
+ /// underlying implementation. The `logger` will always be called
+ /// asynchronously on its owning thread; it doesn't need to be
+ /// thread-safe.
+ pub fn with_logger(logger: Option<&mozIServicesLogSink>) -> Result<LogSink, nsresult> {
+ Ok(if let Some(logger) = logger {
+ // Fetch the maximum log level while we're on the main thread, so
+ // that `LogSink::enabled()` can check it while on the background
+ // thread. Otherwise, we'd need to dispatch a `LogTask` for every
+ // log message, only to discard most of them when the task calls
+ // into the logger on the main thread.
+ let mut raw_max_level = 0i16;
+ let rv = unsafe { logger.GetMaxLevel(&mut raw_max_level) };
+ let max_level = if rv.succeeded() {
+ match raw_max_level {
+ mozIServicesLogSink::LEVEL_ERROR => LevelFilter::Error,
+ mozIServicesLogSink::LEVEL_WARN => LevelFilter::Warn,
+ mozIServicesLogSink::LEVEL_DEBUG => LevelFilter::Debug,
+ mozIServicesLogSink::LEVEL_TRACE => LevelFilter::Trace,
+ mozIServicesLogSink::LEVEL_INFO => LevelFilter::Info,
+ _ => LevelFilter::Off,
+ }
+ } else {
+ LevelFilter::Off
+ };
+ LogSink::new(
+ max_level,
+ ThreadPtrHolder::new(cstr!("mozIServicesLogSink"), RefPtr::new(logger))?,
+ )
+ } else {
+ LogSink::default()
+ })
+ }
+
+ /// Returns a reference to the underlying `mozIServicesLogSink`.
+ pub fn logger(&self) -> Option<&mozIServicesLogSink> {
+ self.logger.as_ref().and_then(|l| l.get())
+ }
+
+ /// Logs a message to the Sync logger, if one is set. This would be better
+ /// implemented as a macro, as Dogear does, so that we can pass variadic
+ /// arguments without manually invoking `fmt_args!()` every time we want
+ /// to log a message.
+ ///
+ /// The `log` crate's macros aren't suitable here, because those log to the
+ /// global logger. However, we don't want to set the global logger in our
+ /// crate, because that will log _everything_ that uses the Rust `log` crate
+ /// to the Sync logs, including WebRender and audio logging.
+ pub fn debug(&self, args: fmt::Arguments) {
+ let meta = Metadata::builder()
+ .level(Level::Debug)
+ .target(module_path!())
+ .build();
+ if self.enabled(&meta) {
+ self.log(&Record::builder().args(args).metadata(meta).build());
+ }
+ }
+}
+
+impl Log for LogSink {
+ #[inline]
+ fn enabled(&self, meta: &Metadata) -> bool {
+ self.logger.is_some() && meta.level() <= self.max_level
+ }
+
+ fn log(&self, record: &Record) {
+ if !self.enabled(record.metadata()) {
+ return;
+ }
+ if let Some(logger) = &self.logger {
+ let mut message = nsString::new();
+ if write!(message, "{}", record.args()).is_ok() {
+ let task = LogTask {
+ logger: logger.clone(),
+ level: record.metadata().level(),
+ message,
+ };
+ let _ = TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
+ .and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
+ }
+ }
+ }
+
+ fn flush(&self) {}
+}
+
+/// Logs a message to the mirror logger. This task is created on the background
+/// thread queue, and dispatched to the main thread.
+struct LogTask {
+ logger: ThreadPtrHandle<mozIServicesLogSink>,
+ level: Level,
+ message: nsString,
+}
+
+impl Task for LogTask {
+ fn run(&self) {
+ let logger = self.logger.get().unwrap();
+ match self.level {
+ Level::Error => unsafe {
+ logger.Error(&*self.message);
+ },
+ Level::Warn => unsafe {
+ logger.Warn(&*self.message);
+ },
+ Level::Debug => unsafe {
+ logger.Debug(&*self.message);
+ },
+ Level::Trace => unsafe {
+ logger.Trace(&*self.message);
+ },
+ Level::Info => unsafe {
+ logger.Info(&*self.message);
+ },
+ }
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ Ok(())
+ }
+}
diff --git a/services/sync/golden_gate/src/task.rs b/services/sync/golden_gate/src/task.rs
new file mode 100644
index 0000000000..8cab21830b
--- /dev/null
+++ b/services/sync/golden_gate/src/task.rs
@@ -0,0 +1,355 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::{fmt::Write, mem, result};
+
+use atomic_refcell::AtomicRefCell;
+use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder};
+use nserror::{nsresult, NS_ERROR_FAILURE};
+use nsstring::{nsACString, nsCString};
+use sync15::engine::{ApplyResults, BridgedEngine};
+use sync15::Guid;
+use thin_vec::ThinVec;
+use xpcom::{
+ interfaces::{
+ mozIBridgedSyncEngineApplyCallback, mozIBridgedSyncEngineCallback, nsIEventTarget,
+ },
+ RefPtr,
+};
+
+use crate::error::{Error, Result};
+use crate::ferry::{Ferry, FerryResult};
+
+/// A ferry task sends (or ferries) an operation to a bridged engine on a
+/// background thread or task queue, and ferries back an optional result to
+/// a callback.
+pub struct FerryTask {
+ /// We want to ensure scheduled ferries can't block finalization of the underlying
+ /// store - we want a degree of confidence that closing the database will happen when
+ /// we want even if tasks are queued up to run on another thread.
+ /// We rely on the semantics of our BridgedEngines to help here:
+ /// * A bridged engine is expected to hold a weak reference to its store.
+ /// * Our LazyStore is the only thing holding a reference to the "real" store.
+ /// Thus, when our LazyStore asks our "real" store to close, we can be confident
+ /// a close will happen (ie, we assume that the real store will be able to unwrapp
+ /// the underlying sqlite `Connection` (using `Arc::try_unwrap`) and close it.
+ /// However, note that if an operation on the bridged engine is currently running,
+ /// we will block waiting for that operation to complete, so while this isn't
+ /// guaranteed to happen immediately, it should happen "soon enough".
+ engine: Box<dyn BridgedEngine>,
+ ferry: Ferry,
+ callback: ThreadPtrHandle<mozIBridgedSyncEngineCallback>,
+ result: AtomicRefCell<anyhow::Result<FerryResult>>,
+}
+
+impl FerryTask {
+ /// Creates a task to fetch the engine's last sync time, in milliseconds.
+ #[inline]
+ pub fn for_last_sync(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::LastSync, callback)
+ }
+
+ /// Creates a task to set the engine's last sync time, in milliseconds.
+ #[inline]
+ pub fn for_set_last_sync(
+ engine: Box<dyn BridgedEngine>,
+ last_sync_millis: i64,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback)
+ }
+
+ /// Creates a task to fetch the engine's sync ID.
+ #[inline]
+ pub fn for_sync_id(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::SyncId, callback)
+ }
+
+ /// Creates a task to reset the engine's sync ID and all its local Sync
+ /// metadata.
+ #[inline]
+ pub fn for_reset_sync_id(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::ResetSyncId, callback)
+ }
+
+ /// Creates a task to compare the bridged engine's local sync ID with
+ /// the `new_sync_id` from `meta/global`, and ferry back the final sync ID
+ /// to use.
+ #[inline]
+ pub fn for_ensure_current_sync_id(
+ engine: Box<dyn BridgedEngine>,
+ new_sync_id: &nsACString,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(
+ engine,
+ Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()),
+ callback,
+ )
+ }
+
+ /// Creates a task to signal that the engine is about to sync.
+ #[inline]
+ pub fn for_sync_started(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::SyncStarted, callback)
+ }
+
+ /// Creates a task to store incoming records.
+ pub fn for_store_incoming(
+ engine: Box<dyn BridgedEngine>,
+ incoming_envelopes_json: &[nsCString],
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(
+ engine,
+ Ferry::StoreIncoming(incoming_envelopes_json.to_vec()),
+ callback,
+ )
+ }
+
+ /// Creates a task to mark a subset of outgoing records as uploaded. This
+ /// may be called multiple times per sync, or not at all if there are no
+ /// records to upload.
+ pub fn for_set_uploaded(
+ engine: Box<dyn BridgedEngine>,
+ server_modified_millis: i64,
+ uploaded_ids: &[nsCString],
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ let uploaded_ids = uploaded_ids.iter().map(|id| Guid::from_slice(id)).collect();
+ Self::with_ferry(
+ engine,
+ Ferry::SetUploaded(server_modified_millis, uploaded_ids),
+ callback,
+ )
+ }
+
+ /// Creates a task to signal that all records have been uploaded, and
+ /// the engine has been synced. This is called even if there were no
+ /// records uploaded.
+ #[inline]
+ pub fn for_sync_finished(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::SyncFinished, callback)
+ }
+
+ /// Creates a task to reset all local Sync state for the engine, without
+ /// erasing user data.
+ #[inline]
+ pub fn for_reset(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::Reset, callback)
+ }
+
+ /// Creates a task to erase all local user data for the engine.
+ #[inline]
+ pub fn for_wipe(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ Self::with_ferry(engine, Ferry::Wipe, callback)
+ }
+
+ /// Creates a task for a ferry. The `callback` is bound to the current
+ /// thread, and will be called once, after the ferry returns from the
+ /// background thread.
+ fn with_ferry(
+ engine: Box<dyn BridgedEngine>,
+ ferry: Ferry,
+ callback: &mozIBridgedSyncEngineCallback,
+ ) -> Result<FerryTask> {
+ let name = ferry.name();
+ Ok(FerryTask {
+ engine,
+ ferry,
+ callback: ThreadPtrHolder::new(
+ cstr!("mozIBridgedSyncEngineCallback"),
+ RefPtr::new(callback),
+ )?,
+ result: AtomicRefCell::new(Err(Error::DidNotRun(name).into())),
+ })
+ }
+
+ /// Dispatches the task to the given thread `target`.
+ pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
+ let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?;
+ // `may_block` schedules the task on the I/O thread pool, since we
+ // expect most operations to wait on I/O.
+ TaskRunnable::dispatch_with_options(
+ runnable,
+ target,
+ DispatchOptions::default().may_block(true),
+ )?;
+ Ok(())
+ }
+
+ /// Runs the task on the background thread. This is split out into its own
+ /// method to make error handling easier.
+ fn inner_run(&self) -> anyhow::Result<FerryResult> {
+ let engine = &self.engine;
+ Ok(match &self.ferry {
+ Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?),
+ Ferry::SetLastSync(last_sync_millis) => {
+ engine.set_last_sync(*last_sync_millis)?;
+ FerryResult::default()
+ }
+ Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?),
+ Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?),
+ Ferry::EnsureCurrentSyncId(new_sync_id) => {
+ FerryResult::AssignedSyncId(engine.ensure_current_sync_id(new_sync_id)?)
+ }
+ Ferry::SyncStarted => {
+ engine.sync_started()?;
+ FerryResult::default()
+ }
+ Ferry::StoreIncoming(incoming_envelopes_json) => {
+ let incoming_envelopes = incoming_envelopes_json
+ .iter()
+ .map(|envelope| Ok(serde_json::from_slice(envelope)?))
+ .collect::<Result<_>>()?;
+
+ engine.store_incoming(incoming_envelopes)?;
+ FerryResult::default()
+ }
+ Ferry::SetUploaded(server_modified_millis, uploaded_ids) => {
+ engine.set_uploaded(*server_modified_millis, uploaded_ids.as_slice())?;
+ FerryResult::default()
+ }
+ Ferry::SyncFinished => {
+ engine.sync_finished()?;
+ FerryResult::default()
+ }
+ Ferry::Reset => {
+ engine.reset()?;
+ FerryResult::default()
+ }
+ Ferry::Wipe => {
+ engine.wipe()?;
+ FerryResult::default()
+ }
+ })
+ }
+}
+
+impl Task for FerryTask {
+ fn run(&self) {
+ *self.result.borrow_mut() = self.inner_run();
+ }
+
+ fn done(&self) -> result::Result<(), nsresult> {
+ let callback = self.callback.get().unwrap();
+ match mem::replace(
+ &mut *self.result.borrow_mut(),
+ Err(Error::DidNotRun(self.ferry.name()).into()),
+ ) {
+ Ok(result) => unsafe { callback.HandleSuccess(result.into_variant().coerce()) },
+ Err(err) => {
+ let mut message = nsCString::new();
+ write!(message, "{err}").unwrap();
+ unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) }
+ }
+ }
+ .to_result()
+ }
+}
+
+/// An apply task ferries incoming records to an engine on a background
+/// thread, and ferries back records to upload. It's separate from
+/// `FerryTask` because its callback type is different.
+pub struct ApplyTask {
+ engine: Box<dyn BridgedEngine>,
+ callback: ThreadPtrHandle<mozIBridgedSyncEngineApplyCallback>,
+ result: AtomicRefCell<anyhow::Result<Vec<String>>>,
+}
+
+impl ApplyTask {
+ /// Returns the task name for debugging.
+ pub fn name() -> &'static str {
+ concat!(module_path!(), "apply")
+ }
+
+ /// Runs the task on the background thread.
+ fn inner_run(&self) -> anyhow::Result<Vec<String>> {
+ let ApplyResults {
+ records: outgoing_records,
+ ..
+ } = self.engine.apply()?;
+ let outgoing_records_json = outgoing_records
+ .iter()
+ .map(|record| Ok(serde_json::to_string(record)?))
+ .collect::<Result<_>>()?;
+ Ok(outgoing_records_json)
+ }
+
+ /// Creates a task. The `callback` is bound to the current thread, and will
+ /// be called once, after the records are applied on the background thread.
+ pub fn new(
+ engine: Box<dyn BridgedEngine>,
+ callback: &mozIBridgedSyncEngineApplyCallback,
+ ) -> Result<ApplyTask> {
+ Ok(ApplyTask {
+ engine,
+ callback: ThreadPtrHolder::new(
+ cstr!("mozIBridgedSyncEngineApplyCallback"),
+ RefPtr::new(callback),
+ )?,
+ result: AtomicRefCell::new(Err(Error::DidNotRun(Self::name()).into())),
+ })
+ }
+
+ /// Dispatches the task to the given thread `target`.
+ pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> {
+ let runnable = TaskRunnable::new(Self::name(), Box::new(self))?;
+ TaskRunnable::dispatch_with_options(
+ runnable,
+ target,
+ DispatchOptions::default().may_block(true),
+ )?;
+ Ok(())
+ }
+}
+
+impl Task for ApplyTask {
+ fn run(&self) {
+ *self.result.borrow_mut() = self.inner_run();
+ }
+
+ fn done(&self) -> result::Result<(), nsresult> {
+ let callback = self.callback.get().unwrap();
+ match mem::replace(
+ &mut *self.result.borrow_mut(),
+ Err(Error::DidNotRun(Self::name()).into()),
+ ) {
+ Ok(envelopes) => {
+ let result = envelopes
+ .into_iter()
+ .map(nsCString::from)
+ .collect::<ThinVec<_>>();
+ unsafe { callback.HandleSuccess(&result) }
+ }
+ Err(err) => {
+ let mut message = nsCString::new();
+ write!(message, "{err}").unwrap();
+ unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) }
+ }
+ }
+ .to_result()
+ }
+}