/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use std::{fmt::Write, mem, result}; use atomic_refcell::AtomicRefCell; use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; use nserror::{nsresult, NS_ERROR_FAILURE}; use nsstring::{nsACString, nsCString}; use sync15::engine::{ApplyResults, BridgedEngine}; use sync15::Guid; use thin_vec::ThinVec; use xpcom::{ interfaces::{ mozIBridgedSyncEngineApplyCallback, mozIBridgedSyncEngineCallback, nsIEventTarget, }, RefPtr, }; use crate::error::{Error, Result}; use crate::ferry::{Ferry, FerryResult}; /// A ferry task sends (or ferries) an operation to a bridged engine on a /// background thread or task queue, and ferries back an optional result to /// a callback. pub struct FerryTask { /// We want to ensure scheduled ferries can't block finalization of the underlying /// store - we want a degree of confidence that closing the database will happen when /// we want even if tasks are queued up to run on another thread. /// We rely on the semantics of our BridgedEngines to help here: /// * A bridged engine is expected to hold a weak reference to its store. /// * Our LazyStore is the only thing holding a reference to the "real" store. /// Thus, when our LazyStore asks our "real" store to close, we can be confident /// a close will happen (ie, we assume that the real store will be able to unwrapp /// the underlying sqlite `Connection` (using `Arc::try_unwrap`) and close it. /// However, note that if an operation on the bridged engine is currently running, /// we will block waiting for that operation to complete, so while this isn't /// guaranteed to happen immediately, it should happen "soon enough". engine: Box, ferry: Ferry, callback: ThreadPtrHandle, result: AtomicRefCell>, } impl FerryTask { /// Creates a task to fetch the engine's last sync time, in milliseconds. #[inline] pub fn for_last_sync( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::LastSync, callback) } /// Creates a task to set the engine's last sync time, in milliseconds. #[inline] pub fn for_set_last_sync( engine: Box, last_sync_millis: i64, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback) } /// Creates a task to fetch the engine's sync ID. #[inline] pub fn for_sync_id( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::SyncId, callback) } /// Creates a task to reset the engine's sync ID and all its local Sync /// metadata. #[inline] pub fn for_reset_sync_id( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::ResetSyncId, callback) } /// Creates a task to compare the bridged engine's local sync ID with /// the `new_sync_id` from `meta/global`, and ferry back the final sync ID /// to use. #[inline] pub fn for_ensure_current_sync_id( engine: Box, new_sync_id: &nsACString, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry( engine, Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()), callback, ) } /// Creates a task to signal that the engine is about to sync. #[inline] pub fn for_sync_started( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::SyncStarted, callback) } /// Creates a task to store incoming records. pub fn for_store_incoming( engine: Box, incoming_envelopes_json: &[nsCString], callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry( engine, Ferry::StoreIncoming(incoming_envelopes_json.to_vec()), callback, ) } /// Creates a task to mark a subset of outgoing records as uploaded. This /// may be called multiple times per sync, or not at all if there are no /// records to upload. pub fn for_set_uploaded( engine: Box, server_modified_millis: i64, uploaded_ids: &[nsCString], callback: &mozIBridgedSyncEngineCallback, ) -> Result { let uploaded_ids = uploaded_ids.iter().map(|id| Guid::from_slice(id)).collect(); Self::with_ferry( engine, Ferry::SetUploaded(server_modified_millis, uploaded_ids), callback, ) } /// Creates a task to signal that all records have been uploaded, and /// the engine has been synced. This is called even if there were no /// records uploaded. #[inline] pub fn for_sync_finished( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::SyncFinished, callback) } /// Creates a task to reset all local Sync state for the engine, without /// erasing user data. #[inline] pub fn for_reset( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::Reset, callback) } /// Creates a task to erase all local user data for the engine. #[inline] pub fn for_wipe( engine: Box, callback: &mozIBridgedSyncEngineCallback, ) -> Result { Self::with_ferry(engine, Ferry::Wipe, callback) } /// Creates a task for a ferry. The `callback` is bound to the current /// thread, and will be called once, after the ferry returns from the /// background thread. fn with_ferry( engine: Box, ferry: Ferry, callback: &mozIBridgedSyncEngineCallback, ) -> Result { let name = ferry.name(); Ok(FerryTask { engine, ferry, callback: ThreadPtrHolder::new( cstr!("mozIBridgedSyncEngineCallback"), RefPtr::new(callback), )?, result: AtomicRefCell::new(Err(Error::DidNotRun(name).into())), }) } /// Dispatches the task to the given thread `target`. pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> { let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?; // `may_block` schedules the task on the I/O thread pool, since we // expect most operations to wait on I/O. TaskRunnable::dispatch_with_options( runnable, target, DispatchOptions::default().may_block(true), )?; Ok(()) } /// Runs the task on the background thread. This is split out into its own /// method to make error handling easier. fn inner_run(&self) -> anyhow::Result { let engine = &self.engine; Ok(match &self.ferry { Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?), Ferry::SetLastSync(last_sync_millis) => { engine.set_last_sync(*last_sync_millis)?; FerryResult::default() } Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?), Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?), Ferry::EnsureCurrentSyncId(new_sync_id) => { FerryResult::AssignedSyncId(engine.ensure_current_sync_id(new_sync_id)?) } Ferry::SyncStarted => { engine.sync_started()?; FerryResult::default() } Ferry::StoreIncoming(incoming_envelopes_json) => { let incoming_envelopes = incoming_envelopes_json .iter() .map(|envelope| Ok(serde_json::from_slice(envelope)?)) .collect::>()?; engine.store_incoming(incoming_envelopes)?; FerryResult::default() } Ferry::SetUploaded(server_modified_millis, uploaded_ids) => { engine.set_uploaded(*server_modified_millis, uploaded_ids.as_slice())?; FerryResult::default() } Ferry::SyncFinished => { engine.sync_finished()?; FerryResult::default() } Ferry::Reset => { engine.reset()?; FerryResult::default() } Ferry::Wipe => { engine.wipe()?; FerryResult::default() } }) } } impl Task for FerryTask { fn run(&self) { *self.result.borrow_mut() = self.inner_run(); } fn done(&self) -> result::Result<(), nsresult> { let callback = self.callback.get().unwrap(); match mem::replace( &mut *self.result.borrow_mut(), Err(Error::DidNotRun(self.ferry.name()).into()), ) { Ok(result) => unsafe { callback.HandleSuccess(result.into_variant().coerce()) }, Err(err) => { let mut message = nsCString::new(); write!(message, "{err}").unwrap(); unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) } } } .to_result() } } /// An apply task ferries incoming records to an engine on a background /// thread, and ferries back records to upload. It's separate from /// `FerryTask` because its callback type is different. pub struct ApplyTask { engine: Box, callback: ThreadPtrHandle, result: AtomicRefCell>>, } impl ApplyTask { /// Returns the task name for debugging. pub fn name() -> &'static str { concat!(module_path!(), "apply") } /// Runs the task on the background thread. fn inner_run(&self) -> anyhow::Result> { let ApplyResults { records: outgoing_records, .. } = self.engine.apply()?; let outgoing_records_json = outgoing_records .iter() .map(|record| Ok(serde_json::to_string(record)?)) .collect::>()?; Ok(outgoing_records_json) } /// Creates a task. The `callback` is bound to the current thread, and will /// be called once, after the records are applied on the background thread. pub fn new( engine: Box, callback: &mozIBridgedSyncEngineApplyCallback, ) -> Result { Ok(ApplyTask { engine, callback: ThreadPtrHolder::new( cstr!("mozIBridgedSyncEngineApplyCallback"), RefPtr::new(callback), )?, result: AtomicRefCell::new(Err(Error::DidNotRun(Self::name()).into())), }) } /// Dispatches the task to the given thread `target`. pub fn dispatch(self, target: &nsIEventTarget) -> Result<()> { let runnable = TaskRunnable::new(Self::name(), Box::new(self))?; TaskRunnable::dispatch_with_options( runnable, target, DispatchOptions::default().may_block(true), )?; Ok(()) } } impl Task for ApplyTask { fn run(&self) { *self.result.borrow_mut() = self.inner_run(); } fn done(&self) -> result::Result<(), nsresult> { let callback = self.callback.get().unwrap(); match mem::replace( &mut *self.result.borrow_mut(), Err(Error::DidNotRun(Self::name()).into()), ) { Ok(envelopes) => { let result = envelopes .into_iter() .map(nsCString::from) .collect::>(); unsafe { callback.HandleSuccess(&result) } } Err(err) => { let mut message = nsCString::new(); write!(message, "{err}").unwrap(); unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) } } } .to_result() } }