diff options
Diffstat (limited to 'services/sync/golden_gate/src/task.rs')
-rw-r--r-- | services/sync/golden_gate/src/task.rs | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/services/sync/golden_gate/src/task.rs b/services/sync/golden_gate/src/task.rs new file mode 100644 index 0000000000..8cab21830b --- /dev/null +++ b/services/sync/golden_gate/src/task.rs @@ -0,0 +1,355 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::{fmt::Write, mem, result}; + +use atomic_refcell::AtomicRefCell; +use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; +use nserror::{nsresult, NS_ERROR_FAILURE}; +use nsstring::{nsACString, nsCString}; +use sync15::engine::{ApplyResults, BridgedEngine}; +use sync15::Guid; +use thin_vec::ThinVec; +use xpcom::{ + interfaces::{ + mozIBridgedSyncEngineApplyCallback, mozIBridgedSyncEngineCallback, nsIEventTarget, + }, + RefPtr, +}; + +use crate::error::{Error, Result}; +use crate::ferry::{Ferry, FerryResult}; + +/// A ferry task sends (or ferries) an operation to a bridged engine on a +/// background thread or task queue, and ferries back an optional result to +/// a callback. +pub struct FerryTask { + /// We want to ensure scheduled ferries can't block finalization of the underlying + /// store - we want a degree of confidence that closing the database will happen when + /// we want even if tasks are queued up to run on another thread. + /// We rely on the semantics of our BridgedEngines to help here: + /// * A bridged engine is expected to hold a weak reference to its store. + /// * Our LazyStore is the only thing holding a reference to the "real" store. + /// Thus, when our LazyStore asks our "real" store to close, we can be confident + /// a close will happen (ie, we assume that the real store will be able to unwrapp + /// the underlying sqlite `Connection` (using `Arc::try_unwrap`) and close it. + /// However, note that if an operation on the bridged engine is currently running, + /// we will block waiting for that operation to complete, so while this isn't + /// guaranteed to happen immediately, it should happen "soon enough". + engine: Box<dyn BridgedEngine>, + ferry: Ferry, + callback: ThreadPtrHandle<mozIBridgedSyncEngineCallback>, + result: AtomicRefCell<anyhow::Result<FerryResult>>, +} + +impl FerryTask { + /// Creates a task to fetch the engine's last sync time, in milliseconds. + #[inline] + pub fn for_last_sync( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::LastSync, callback) + } + + /// Creates a task to set the engine's last sync time, in milliseconds. + #[inline] + pub fn for_set_last_sync( + engine: Box<dyn BridgedEngine>, + last_sync_millis: i64, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback) + } + + /// Creates a task to fetch the engine's sync ID. + #[inline] + pub fn for_sync_id( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SyncId, callback) + } + + /// Creates a task to reset the engine's sync ID and all its local Sync + /// metadata. + #[inline] + pub fn for_reset_sync_id( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::ResetSyncId, callback) + } + + /// Creates a task to compare the bridged engine's local sync ID with + /// the `new_sync_id` from `meta/global`, and ferry back the final sync ID + /// to use. + #[inline] + pub fn for_ensure_current_sync_id( + engine: Box<dyn BridgedEngine>, + new_sync_id: &nsACString, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry( + engine, + Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()), + callback, + ) + } + + /// Creates a task to signal that the engine is about to sync. + #[inline] + pub fn for_sync_started( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SyncStarted, callback) + } + + /// Creates a task to store incoming records. + pub fn for_store_incoming( + engine: Box<dyn BridgedEngine>, + incoming_envelopes_json: &[nsCString], + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry( + engine, + Ferry::StoreIncoming(incoming_envelopes_json.to_vec()), + callback, + ) + } + + /// Creates a task to mark a subset of outgoing records as uploaded. This + /// may be called multiple times per sync, or not at all if there are no + /// records to upload. + pub fn for_set_uploaded( + engine: Box<dyn BridgedEngine>, + server_modified_millis: i64, + uploaded_ids: &[nsCString], + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + let uploaded_ids = uploaded_ids.iter().map(|id| Guid::from_slice(id)).collect(); + Self::with_ferry( + engine, + Ferry::SetUploaded(server_modified_millis, uploaded_ids), + callback, + ) + } + + /// Creates a task to signal that all records have been uploaded, and + /// the engine has been synced. This is called even if there were no + /// records uploaded. + #[inline] + pub fn for_sync_finished( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::SyncFinished, callback) + } + + /// Creates a task to reset all local Sync state for the engine, without + /// erasing user data. + #[inline] + pub fn for_reset( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::Reset, callback) + } + + /// Creates a task to erase all local user data for the engine. + #[inline] + pub fn for_wipe( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + Self::with_ferry(engine, Ferry::Wipe, callback) + } + + /// Creates a task for a ferry. The `callback` is bound to the current + /// thread, and will be called once, after the ferry returns from the + /// background thread. + fn with_ferry( + engine: Box<dyn BridgedEngine>, + ferry: Ferry, + callback: &mozIBridgedSyncEngineCallback, + ) -> Result<FerryTask> { + let name = ferry.name(); + Ok(FerryTask { + engine, + ferry, + callback: ThreadPtrHolder::new( + cstr!("mozIBridgedSyncEngineCallback"), + RefPtr::new(callback), + )?, + result: AtomicRefCell::new(Err(Error::DidNotRun(name).into())), + }) + } + + /// Dispatches the task to the given thread `target`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> { + let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?; + // `may_block` schedules the task on the I/O thread pool, since we + // expect most operations to wait on I/O. + TaskRunnable::dispatch_with_options( + runnable, + target, + DispatchOptions::default().may_block(true), + )?; + Ok(()) + } + + /// Runs the task on the background thread. This is split out into its own + /// method to make error handling easier. + fn inner_run(&self) -> anyhow::Result<FerryResult> { + let engine = &self.engine; + Ok(match &self.ferry { + Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?), + Ferry::SetLastSync(last_sync_millis) => { + engine.set_last_sync(*last_sync_millis)?; + FerryResult::default() + } + Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?), + Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?), + Ferry::EnsureCurrentSyncId(new_sync_id) => { + FerryResult::AssignedSyncId(engine.ensure_current_sync_id(new_sync_id)?) + } + Ferry::SyncStarted => { + engine.sync_started()?; + FerryResult::default() + } + Ferry::StoreIncoming(incoming_envelopes_json) => { + let incoming_envelopes = incoming_envelopes_json + .iter() + .map(|envelope| Ok(serde_json::from_slice(envelope)?)) + .collect::<Result<_>>()?; + + engine.store_incoming(incoming_envelopes)?; + FerryResult::default() + } + Ferry::SetUploaded(server_modified_millis, uploaded_ids) => { + engine.set_uploaded(*server_modified_millis, uploaded_ids.as_slice())?; + FerryResult::default() + } + Ferry::SyncFinished => { + engine.sync_finished()?; + FerryResult::default() + } + Ferry::Reset => { + engine.reset()?; + FerryResult::default() + } + Ferry::Wipe => { + engine.wipe()?; + FerryResult::default() + } + }) + } +} + +impl Task for FerryTask { + fn run(&self) { + *self.result.borrow_mut() = self.inner_run(); + } + + fn done(&self) -> result::Result<(), nsresult> { + let callback = self.callback.get().unwrap(); + match mem::replace( + &mut *self.result.borrow_mut(), + Err(Error::DidNotRun(self.ferry.name()).into()), + ) { + Ok(result) => unsafe { callback.HandleSuccess(result.into_variant().coerce()) }, + Err(err) => { + let mut message = nsCString::new(); + write!(message, "{err}").unwrap(); + unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) } + } + } + .to_result() + } +} + +/// An apply task ferries incoming records to an engine on a background +/// thread, and ferries back records to upload. It's separate from +/// `FerryTask` because its callback type is different. +pub struct ApplyTask { + engine: Box<dyn BridgedEngine>, + callback: ThreadPtrHandle<mozIBridgedSyncEngineApplyCallback>, + result: AtomicRefCell<anyhow::Result<Vec<String>>>, +} + +impl ApplyTask { + /// Returns the task name for debugging. + pub fn name() -> &'static str { + concat!(module_path!(), "apply") + } + + /// Runs the task on the background thread. + fn inner_run(&self) -> anyhow::Result<Vec<String>> { + let ApplyResults { + records: outgoing_records, + .. + } = self.engine.apply()?; + let outgoing_records_json = outgoing_records + .iter() + .map(|record| Ok(serde_json::to_string(record)?)) + .collect::<Result<_>>()?; + Ok(outgoing_records_json) + } + + /// Creates a task. The `callback` is bound to the current thread, and will + /// be called once, after the records are applied on the background thread. + pub fn new( + engine: Box<dyn BridgedEngine>, + callback: &mozIBridgedSyncEngineApplyCallback, + ) -> Result<ApplyTask> { + Ok(ApplyTask { + engine, + callback: ThreadPtrHolder::new( + cstr!("mozIBridgedSyncEngineApplyCallback"), + RefPtr::new(callback), + )?, + result: AtomicRefCell::new(Err(Error::DidNotRun(Self::name()).into())), + }) + } + + /// Dispatches the task to the given thread `target`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> { + let runnable = TaskRunnable::new(Self::name(), Box::new(self))?; + TaskRunnable::dispatch_with_options( + runnable, + target, + DispatchOptions::default().may_block(true), + )?; + Ok(()) + } +} + +impl Task for ApplyTask { + fn run(&self) { + *self.result.borrow_mut() = self.inner_run(); + } + + fn done(&self) -> result::Result<(), nsresult> { + let callback = self.callback.get().unwrap(); + match mem::replace( + &mut *self.result.borrow_mut(), + Err(Error::DidNotRun(Self::name()).into()), + ) { + Ok(envelopes) => { + let result = envelopes + .into_iter() + .map(nsCString::from) + .collect::<ThinVec<_>>(); + unsafe { callback.HandleSuccess(&result) } + } + Err(err) => { + let mut message = nsCString::new(); + write!(message, "{err}").unwrap(); + unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) } + } + } + .to_result() + } +} |