diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/sync15/src/clients_engine | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/sync15/src/clients_engine')
-rw-r--r-- | third_party/rust/sync15/src/clients_engine/engine.rs | 814 | ||||
-rw-r--r-- | third_party/rust/sync15/src/clients_engine/mod.rs | 93 | ||||
-rw-r--r-- | third_party/rust/sync15/src/clients_engine/record.rs | 124 | ||||
-rw-r--r-- | third_party/rust/sync15/src/clients_engine/ser.rs | 125 |
4 files changed, 1156 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/clients_engine/engine.rs b/third_party/rust/sync15/src/clients_engine/engine.rs new file mode 100644 index 0000000000..f3d6242126 --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/engine.rs @@ -0,0 +1,814 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; + +use crate::bso::{IncomingKind, OutgoingBso, OutgoingEnvelope}; +use crate::client::{ + CollState, CollectionKeys, CollectionUpdate, GlobalState, InfoConfiguration, + Sync15StorageClient, +}; +use crate::client_types::{ClientData, RemoteClient}; +use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; +use crate::{error::Result, Guid, KeyBundle}; +use interrupt_support::Interruptee; + +use super::{ + record::{ClientRecord, CommandRecord}, + ser::shrink_to_fit, + Command, CommandProcessor, CommandStatus, CLIENTS_TTL, +}; + +const COLLECTION_NAME: &str = "clients"; + +/// The driver for the clients engine. Internal; split out from the `Engine` +/// struct to make testing easier. +struct Driver<'a> { + command_processor: &'a dyn CommandProcessor, + interruptee: &'a dyn Interruptee, + config: &'a InfoConfiguration, + recent_clients: HashMap<String, RemoteClient>, +} + +impl<'a> Driver<'a> { + fn new( + command_processor: &'a dyn CommandProcessor, + interruptee: &'a dyn Interruptee, + config: &'a InfoConfiguration, + ) -> Driver<'a> { + Driver { + command_processor, + interruptee, + config, + recent_clients: HashMap::new(), + } + } + + fn note_recent_client(&mut self, client: &ClientRecord) { + self.recent_clients.insert(client.id.clone(), client.into()); + } + + fn sync( + &mut self, + inbound: IncomingChangeset, + should_refresh_client: bool, + ) -> Result<OutgoingChangeset> { + let mut outgoing = OutgoingChangeset::new(COLLECTION_NAME, inbound.timestamp); + outgoing.timestamp = inbound.timestamp; + + self.interruptee.err_if_interrupted()?; + let outgoing_commands = self.command_processor.fetch_outgoing_commands()?; + + let mut has_own_client_record = false; + + for bso in inbound.changes { + self.interruptee.err_if_interrupted()?; + + let content = bso.into_content(); + + let client: ClientRecord = match content.kind { + IncomingKind::Malformed => { + log::debug!("Error unpacking record"); + continue; + } + IncomingKind::Tombstone => { + log::debug!("Record has been deleted; skipping..."); + continue; + } + IncomingKind::Content(client) => client, + }; + + if client.id == self.command_processor.settings().fxa_device_id { + log::debug!("Found my record on the server"); + // If we see our own client record, apply any incoming commands, + // remove them from the list, and reupload the record. Any + // commands that we don't understand also go back in the list. + // https://github.com/mozilla/application-services/issues/1800 + // tracks if that's the right thing to do. + has_own_client_record = true; + let mut current_client_record = self.current_client_record(); + for c in &client.commands { + let status = match c.as_command() { + Some(command) => self.command_processor.apply_incoming_command(command)?, + None => CommandStatus::Unsupported, + }; + match status { + CommandStatus::Applied => {} + CommandStatus::Ignored => { + log::debug!("Ignored command {:?}", c); + } + CommandStatus::Unsupported => { + log::warn!("Don't know how to apply command {:?}", c); + current_client_record.commands.push(c.clone()); + } + } + } + + // The clients collection has a hard limit on the payload size, + // after which the server starts rejecting our records. Large + // command lists can cause us to exceed this, so we truncate + // the list. + shrink_to_fit( + &mut current_client_record.commands, + self.memcache_max_record_payload_size(), + )?; + + // Add the new client record to our map of recently synced + // clients, so that downstream consumers like synced tabs can + // access them. + self.note_recent_client(¤t_client_record); + + // We periodically upload our own client record, even if it + // doesn't change, to keep it fresh. + if should_refresh_client || client != current_client_record { + log::debug!("Will update our client record on the server"); + let envelope = OutgoingEnvelope { + id: content.envelope.id, + ttl: Some(CLIENTS_TTL), + ..Default::default() + }; + outgoing + .changes + .push(OutgoingBso::from_content(envelope, current_client_record)?); + } + } else { + // Add the other client to our map of recently synced clients. + self.note_recent_client(&client); + + // Bail if we don't have any outgoing commands to write into + // the other client's record. + if outgoing_commands.is_empty() { + continue; + } + + // Determine if we have new commands, that aren't already in the + // client's command list. + let current_commands: HashSet<Command> = client + .commands + .iter() + .filter_map(|c| c.as_command()) + .collect(); + let mut new_outgoing_commands = outgoing_commands + .difference(¤t_commands) + .cloned() + .collect::<Vec<_>>(); + // Sort, to ensure deterministic ordering for tests. + new_outgoing_commands.sort(); + let mut new_client = client.clone(); + new_client + .commands + .extend(new_outgoing_commands.into_iter().map(CommandRecord::from)); + if new_client.commands.len() == client.commands.len() { + continue; + } + + // Hooray, we added new commands! Make sure the record still + // fits in the maximum record size, or the server will reject + // our upload. + shrink_to_fit( + &mut new_client.commands, + self.memcache_max_record_payload_size(), + )?; + + let envelope = OutgoingEnvelope { + id: content.envelope.id, + ttl: Some(CLIENTS_TTL), + ..Default::default() + }; + outgoing + .changes + .push(OutgoingBso::from_content(envelope, new_client)?); + } + } + + // Upload a record for our own client, if we didn't replace it already. + if !has_own_client_record { + let current_client_record = self.current_client_record(); + self.note_recent_client(¤t_client_record); + let envelope = OutgoingEnvelope { + id: Guid::new(¤t_client_record.id), + ttl: Some(CLIENTS_TTL), + ..Default::default() + }; + outgoing + .changes + .push(OutgoingBso::from_content(envelope, current_client_record)?); + } + + Ok(outgoing) + } + + /// Builds a fresh client record for this device. + fn current_client_record(&self) -> ClientRecord { + let settings = self.command_processor.settings(); + ClientRecord { + id: settings.fxa_device_id.clone(), + name: settings.device_name.clone(), + typ: settings.device_type.into(), + commands: Vec::new(), + fxa_device_id: Some(settings.fxa_device_id.clone()), + version: None, + protocols: vec!["1.5".into()], + form_factor: None, + os: None, + app_package: None, + application: None, + device: None, + } + } + + fn max_record_payload_size(&self) -> usize { + let payload_max = self.config.max_record_payload_bytes; + if payload_max <= self.config.max_post_bytes { + self.config.max_post_bytes.saturating_sub(4096) + } else { + payload_max + } + } + + /// Collections stored in memcached ("tabs", "clients" or "meta") have a + /// different max size than ones stored in the normal storage server db. + /// In practice, the real limit here is 1M (bug 1300451 comment 40), but + /// there's overhead involved that is hard to calculate on the client, so we + /// use 512k to be safe (at the recommendation of the server team). Note + /// that if the server reports a lower limit (via info/configuration), we + /// respect that limit instead. See also bug 1403052. + /// XXX - the above comment is stale and refers to the world before the + /// move to spanner and the rust sync server. + fn memcache_max_record_payload_size(&self) -> usize { + self.max_record_payload_size().min(512 * 1024) + } +} + +pub struct Engine<'a> { + pub command_processor: &'a dyn CommandProcessor, + pub interruptee: &'a dyn Interruptee, + pub recent_clients: HashMap<String, RemoteClient>, +} + +impl<'a> Engine<'a> { + /// Creates a new clients engine that delegates to the given command + /// processor to apply incoming commands. + pub fn new<'b>( + command_processor: &'b dyn CommandProcessor, + interruptee: &'b dyn Interruptee, + ) -> Engine<'b> { + Engine { + command_processor, + interruptee, + recent_clients: HashMap::new(), + } + } + + /// Syncs the clients collection. This works a little differently than + /// other collections: + /// + /// 1. It can't be disabled or declined. + /// 2. The sync ID and last sync time aren't meaningful, since we always + /// fetch all client records on every sync. As such, the + /// `LocalCollStateMachine` that we use for other engines doesn't + /// apply to it. + /// 3. It doesn't persist state directly, but relies on the sync manager + /// to persist device settings, and process commands. + /// 4. Failing to sync the clients collection is fatal, and aborts the + /// sync. + /// + /// For these reasons, we implement this engine directly in the `sync15` + /// crate, and provide a specialized `sync` method instead of implementing + /// `sync15::Store`. + pub fn sync( + &mut self, + storage_client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + should_refresh_client: bool, + ) -> Result<()> { + log::info!("Syncing collection clients"); + + let coll_keys = CollectionKeys::from_encrypted_payload( + global_state.keys.clone(), + global_state.keys_timestamp, + root_sync_key, + )?; + let mut coll_state = CollState { + config: global_state.config.clone(), + last_modified: global_state + .collections + .get(COLLECTION_NAME) + .cloned() + .unwrap_or_default(), + key: coll_keys.key_for_collection(COLLECTION_NAME).clone(), + }; + + let inbound = self.fetch_incoming(storage_client, &mut coll_state)?; + + let mut driver = Driver::new( + self.command_processor, + self.interruptee, + &global_state.config, + ); + + let outgoing = driver.sync(inbound, should_refresh_client)?; + self.recent_clients = driver.recent_clients; + + coll_state.last_modified = outgoing.timestamp; + + self.interruptee.err_if_interrupted()?; + let upload_info = + CollectionUpdate::new_from_changeset(storage_client, &coll_state, outgoing, true)? + .upload()?; + + log::info!( + "Upload success ({} records success, {} records failed)", + upload_info.successful_ids.len(), + upload_info.failed_ids.len() + ); + + log::info!("Finished syncing clients"); + Ok(()) + } + + fn fetch_incoming( + &self, + storage_client: &Sync15StorageClient, + coll_state: &mut CollState, + ) -> Result<IncomingChangeset> { + // Note that, unlike other stores, we always fetch the full collection + // on every sync, so `inbound` will return all clients, not just the + // ones that changed since the last sync. + let coll_request = CollectionRequest::new(COLLECTION_NAME).full(); + + self.interruptee.err_if_interrupted()?; + let inbound = crate::client::fetch_incoming(storage_client, coll_state, &coll_request)?; + + Ok(inbound) + } + + pub fn local_client_id(&self) -> String { + // Bit dirty but it's the easiest way to reach to our own + // device ID without refactoring the whole sync manager crate. + self.command_processor.settings().fxa_device_id.clone() + } + + pub fn get_client_data(&self) -> ClientData { + ClientData { + local_client_id: self.local_client_id(), + recent_clients: self.recent_clients.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::super::{CommandStatus, DeviceType, Settings}; + use super::*; + use crate::bso::IncomingBso; + use crate::ServerTimestamp; + use anyhow::Result; + use interrupt_support::NeverInterrupts; + use serde_json::{json, Value}; + use std::iter::zip; + + struct TestProcessor { + settings: Settings, + outgoing_commands: HashSet<Command>, + } + + impl CommandProcessor for TestProcessor { + fn settings(&self) -> &Settings { + &self.settings + } + + fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> { + Ok(if let Command::Reset(name) = command { + if name == "forms" { + CommandStatus::Unsupported + } else { + CommandStatus::Applied + } + } else { + CommandStatus::Ignored + }) + } + + fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> { + Ok(self.outgoing_commands.clone()) + } + } + + fn inbound_from_clients(clients: Value) -> IncomingChangeset { + if let Value::Array(clients) = clients { + let changes = clients + .into_iter() + .map(IncomingBso::from_test_content) + .collect(); + IncomingChangeset { + changes, + timestamp: ServerTimestamp(0), + collection: COLLECTION_NAME.into(), + } + } else { + unreachable!("`clients` must be an array of client records") + } + } + + #[test] + fn test_clients_sync() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [ + Command::Wipe("bookmarks".into()), + Command::Reset("history".into()), + ] + .iter() + .cloned() + .collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let inbound = inbound_from_clients(json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceCCCCCC", + "name": "Fenix", + "type": "mobile", + "commands": [], + "fxaDeviceId": "deviceCCCCCC", + }, { + "id": "deviceAAAAAA", + "name": "Laptop with a different name", + "type": "desktop", + "commands": [{ + "command": "wipeEngine", + "args": ["logins"] + }, { + "command": "displayURI", + "args": ["http://example.com", "Fennec", "Example page"], + "flowID": "flooooooooow", + }, { + "command": "resetEngine", + "args": ["forms"], + }, { + "command": "logout", + "args": [], + }], + "fxaDeviceId": "deviceAAAAAA", + }])); + + // Passing false for `should_refresh_client` - it should be ignored + // because we've changed the commands. + let mut outgoing = driver.sync(inbound, false).expect("Should sync clients"); + outgoing + .changes + .sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id)); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + RemoteClient { + fxa_device_id: Some("deviceCCCCCC".to_string()), + device_name: "Fenix".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::<Vec<RemoteClient>>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + + let expected = json!([{ + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "commands": [{ + "command": "displayURI", + "args": ["http://example.com", "Fennec", "Example page"], + "flowID": "flooooooooow", + }, { + "command": "resetEngine", + "args": ["forms"], + }, { + "command": "logout", + "args": [], + }], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }, { + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }, { + "command": "wipeEngine", + "args": ["bookmarks"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceCCCCCC", + "name": "Fenix", + "type": "mobile", + "commands": [{ + "command": "wipeEngine", + "args": ["bookmarks"], + }, { + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "deviceCCCCCC", + }]); + // turn outgoing into an incoming payload. + let incoming = IncomingChangeset { + changes: outgoing + .changes + .into_iter() + .map(|c| OutgoingBso::to_test_incoming(&c)) + .collect(), + timestamp: outgoing.timestamp, + collection: outgoing.collection, + }; + if let Value::Array(expected) = expected { + for (incoming_cleartext, exp_client) in zip(incoming.changes, expected) { + let incoming_client: ClientRecord = + incoming_cleartext.into_content().content().unwrap(); + assert_eq!(incoming_client, serde_json::from_value(exp_client).unwrap()); + } + } else { + unreachable!("`expected_clients` must be an array of client records") + } + } + + #[test] + fn test_clients_sync_bad_incoming_record_skipped() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [].iter().cloned().collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let inbound = inbound_from_clients(json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "garbage", + "garbage": "value", + }, { + "id": "deviceCCCCCC", + "deleted": true, + "name": "Fenix", + "type": "mobile", + "commands": [], + "fxaDeviceId": "deviceCCCCCC", + }])); + + driver.sync(inbound, false).expect("Should sync clients"); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::<Vec<RemoteClient>>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + } + + #[test] + fn test_clients_sync_explicit_refresh() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [].iter().cloned().collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let test_clients = json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "commands": [], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }]); + + let outgoing = driver + .sync(inbound_from_clients(test_clients.clone()), false) + .expect("Should sync clients"); + // should be no outgoing changes. + assert_eq!(outgoing.changes.len(), 0); + + // Make sure the list of recently synced remote clients is correct and + // still includes our record we didn't update. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + // Do it again - still no changes, but force a refresh. + let outgoing = driver + .sync(inbound_from_clients(test_clients), true) + .expect("Should sync clients"); + assert_eq!(outgoing.changes.len(), 1); + + // Do it again - but this time with our own client record needing + // some change. + let inbound = inbound_from_clients(json!([{ + "id": "deviceAAAAAA", + "name": "Laptop with New Name", + "type": "desktop", + "commands": [], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }])); + let outgoing = driver.sync(inbound, false).expect("Should sync clients"); + // should still be outgoing because the name changed. + assert_eq!(outgoing.changes.len(), 1); + } + + #[test] + fn test_fresh_client_record() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: HashSet::new(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let clients = json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }]); + + let inbound = if let Value::Array(clients) = clients { + let changes = clients + .into_iter() + .map(IncomingBso::from_test_content) + .collect(); + IncomingChangeset { + changes, + timestamp: ServerTimestamp(0), + collection: COLLECTION_NAME.into(), + } + } else { + unreachable!("`clients` must be an array of client records") + }; + + // Passing false here for should_refresh_client, but it should be + // ignored as we don't have an existing record yet. + let mut outgoing = driver.sync(inbound, false).expect("Should sync clients"); + outgoing + .changes + .sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id)); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::<Vec<RemoteClient>>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + + let expected = json!([{ + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + "ttl": CLIENTS_TTL, + }]); + if let Value::Array(expected) = expected { + // turn outgoing into an incoming payload. + let incoming = IncomingChangeset { + changes: outgoing + .changes + .into_iter() + .map(|c| OutgoingBso::to_test_incoming(&c)) + .collect(), + timestamp: outgoing.timestamp, + collection: outgoing.collection, + }; + for (incoming_cleartext, record) in zip(incoming.changes, expected) { + let incoming_client: ClientRecord = + incoming_cleartext.into_content().content().unwrap(); + assert_eq!(incoming_client, serde_json::from_value(record).unwrap()); + } + } else { + unreachable!("`expected_clients` must be an array of client records") + } + } +} diff --git a/third_party/rust/sync15/src/clients_engine/mod.rs b/third_party/rust/sync15/src/clients_engine/mod.rs new file mode 100644 index 0000000000..7346712dc7 --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/mod.rs @@ -0,0 +1,93 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! The client engine is a [crate::engine](Sync Engine) used to manage the +//! "clients" collection. The clients engine manages the client record for +//! "this device, and also manages "commands". +//! In short, commands target one or more engines and instruct them to +//! perform various operations - such as wiping all local data. +//! These commands are used very rarely - currently the only command used +//! in practice is for bookmarks to wipe all their data, which is sent when +//! a desktop device restores all bookmarks from a backup. In this scenario, +//! desktop will delete all local bookmarks then replace them with the backed +//! up set, which without a "wipe" command would almost certainly cause other +//! connected devices to "resurrect" the deleted bookmarks. +use std::collections::HashSet; + +mod engine; +mod record; +mod ser; + +use crate::DeviceType; +use anyhow::Result; +pub use engine::Engine; + +// These are what desktop uses. +const CLIENTS_TTL: u32 = 15_552_000; // 180 days +pub(crate) const CLIENTS_TTL_REFRESH: u64 = 604_800; // 7 days + +/// A command processor applies incoming commands like wipes and resets for all +/// stores, and returns commands to send to other clients. It also manages +/// settings like the device name and type, which is stored in the special +/// `clients` collection. +/// +/// In practice, this trait only has one implementation, in the sync manager. +/// It's split this way because the clients engine depends on internal `sync15` +/// structures, and can't be implemented as a syncable store...but `sync15` +/// doesn't know anything about multiple engines. This lets the sync manager +/// provide its own implementation for handling wipe and reset commands for all +/// the engines that it manages. +pub trait CommandProcessor { + fn settings(&self) -> &Settings; + + /// Fetches commands to send to other clients. An error return value means + /// commands couldn't be fetched, and halts the sync. + fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>>; + + /// Applies a command sent to this client from another client. This method + /// should return a `CommandStatus` indicating whether the command was + /// processed. + /// + /// An error return value means the sync manager encountered an error + /// applying the command, and halts the sync to prevent unexpected behavior + /// (for example, merging local and remote bookmarks, when we were told to + /// wipe our local bookmarks). + fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus>; +} + +/// Indicates if a command was applied successfully, ignored, or not supported. +/// Applied and ignored commands are removed from our client record, and never +/// retried. Unsupported commands are put back into our record, and retried on +/// subsequent syncs. This is to handle clients adding support for new data +/// types. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum CommandStatus { + Applied, + Ignored, + Unsupported, +} + +/// Information about this device to include in its client record. This should +/// be persisted across syncs, as part of the sync manager state. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct Settings { + /// The FxA device ID of this client, also used as this client's record ID + /// in the clients collection. + pub fxa_device_id: String, + /// The name of this client. This should match the client's name in the + /// FxA device manager. + pub device_name: String, + /// The type of this client: mobile, tablet, desktop, or other. + pub device_type: DeviceType, +} + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum Command { + /// Erases all local data for a specific engine. + Wipe(String), + /// Resets local sync state for all engines. + ResetAll, + /// Resets local sync state for a specific engine. + Reset(String), +} diff --git a/third_party/rust/sync15/src/clients_engine/record.rs b/third_party/rust/sync15/src/clients_engine/record.rs new file mode 100644 index 0000000000..12de36d82f --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/record.rs @@ -0,0 +1,124 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use serde_derive::*; + +use super::Command; + +/// The serialized form of a client record. +#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClientRecord { + #[serde(rename = "id")] + pub id: String, + + pub name: String, + + #[serde(default, rename = "type")] + pub typ: Option<crate::DeviceType>, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub commands: Vec<CommandRecord>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fxa_device_id: Option<String>, + + /// `version`, `protocols`, `formfactor`, `os`, `appPackage`, `application`, + /// and `device` are unused and optional in all implementations (Desktop, + /// iOS, and Fennec), but we round-trip them. + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub version: Option<String>, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub protocols: Vec<String>, + + #[serde( + default, + rename = "formfactor", + skip_serializing_if = "Option::is_none" + )] + pub form_factor: Option<String>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub os: Option<String>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub app_package: Option<String>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub application: Option<String>, + + /// The model of the device, like "iPhone" or "iPod touch" on iOS. Note + /// that this is _not_ the client ID (`id`) or the FxA device ID + /// (`fxa_device_id`). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub device: Option<String>, +} + +impl From<&ClientRecord> for crate::RemoteClient { + fn from(record: &ClientRecord) -> crate::RemoteClient { + crate::RemoteClient { + fxa_device_id: record.fxa_device_id.clone(), + device_name: record.name.clone(), + device_type: record.typ, + } + } +} + +/// The serialized form of a client command. +#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CommandRecord { + /// The command name. This is a string, not an enum, because we want to + /// round-trip commands that we don't support yet. + #[serde(rename = "command")] + pub name: String, + + /// Extra, command-specific arguments. Note that we must send an empty + /// array if the command expects no arguments. + #[serde(default)] + pub args: Vec<String>, + + /// Some commands, like repair, send a "flow ID" that other cliennts can + /// record in their telemetry. We don't currently send commands with + /// flow IDs, but we round-trip them. + #[serde(default, rename = "flowID", skip_serializing_if = "Option::is_none")] + pub flow_id: Option<String>, +} + +impl CommandRecord { + /// Converts a serialized command into one that we can apply. Returns `None` + /// if we don't support the command. + pub fn as_command(&self) -> Option<Command> { + match self.name.as_str() { + "wipeEngine" => self.args.get(0).map(|e| Command::Wipe(e.into())), + "resetEngine" => self.args.get(0).map(|e| Command::Reset(e.into())), + "resetAll" => Some(Command::ResetAll), + _ => None, + } + } +} + +impl From<Command> for CommandRecord { + fn from(command: Command) -> CommandRecord { + match command { + Command::Wipe(engine) => CommandRecord { + name: "wipeEngine".into(), + args: vec![engine], + flow_id: None, + }, + Command::Reset(engine) => CommandRecord { + name: "resetEngine".into(), + args: vec![engine], + flow_id: None, + }, + Command::ResetAll => CommandRecord { + name: "resetAll".into(), + args: Vec::new(), + flow_id: None, + }, + } + } +} diff --git a/third_party/rust/sync15/src/clients_engine/ser.rs b/third_party/rust/sync15/src/clients_engine/ser.rs new file mode 100644 index 0000000000..2e7b0817b8 --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/ser.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::Result; +use serde::Serialize; +use std::io::{self, Write}; + +/// A writer that counts the number of bytes it's asked to write, and discards +/// the data. Used to calculate the serialized size of the commands list. +#[derive(Clone, Copy, Default)] +pub struct WriteCount(usize); + +impl WriteCount { + #[inline] + pub fn len(self) -> usize { + self.0 + } +} + +impl Write for WriteCount { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0 += buf.len(); + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +/// Returns the size of the given value, in bytes, when serialized to JSON. +fn compute_serialized_size<T: Serialize>(value: &T) -> Result<usize> { + let mut w = WriteCount::default(); + serde_json::to_writer(&mut w, value)?; + Ok(w.len()) +} + +/// Truncates `list` to fit within `payload_size_max_bytes` when serialized to +/// JSON. +pub fn shrink_to_fit<T: Serialize>(list: &mut Vec<T>, payload_size_max_bytes: usize) -> Result<()> { + let size = compute_serialized_size(&list)?; + // See bug 535326 comment 8 for an explanation of the estimation + match ((payload_size_max_bytes / 4) * 3).checked_sub(1500) { + Some(max_serialized_size) => { + if size > max_serialized_size { + // Estimate a little more than the direct fraction to maximize packing + let cutoff = (list.len() * max_serialized_size - 1) / size + 1; + list.truncate(cutoff + 1); + // Keep dropping off the last entry until the data fits. + while compute_serialized_size(&list)? > max_serialized_size { + if list.pop().is_none() { + break; + } + } + } + Ok(()) + } + None => { + list.clear(); + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::super::record::CommandRecord; + use super::*; + + #[test] + fn test_compute_serialized_size() { + assert_eq!(compute_serialized_size(&1).unwrap(), 1); + assert_eq!(compute_serialized_size(&"hi").unwrap(), 4); + assert_eq!( + compute_serialized_size(&["hi", "hello", "bye"]).unwrap(), + 20 + ); + } + + #[test] + fn test_shrink_to_fit() { + let mut commands = vec![ + CommandRecord { + name: "wipeEngine".into(), + args: vec!["bookmarks".into()], + flow_id: Some("flow".into()), + }, + CommandRecord { + name: "resetEngine".into(), + args: vec!["history".into()], + flow_id: Some("flow".into()), + }, + CommandRecord { + name: "logout".into(), + args: Vec::new(), + flow_id: None, + }, + ]; + + // 4096 bytes is enough to fit all three commands. + shrink_to_fit(&mut commands, 4096).unwrap(); + assert_eq!(commands.len(), 3); + + let sizes = commands + .iter() + .map(|c| compute_serialized_size(c).unwrap()) + .collect::<Vec<_>>(); + assert_eq!(sizes, &[61, 60, 30]); + + // `logout` won't fit within 2168 bytes. + shrink_to_fit(&mut commands, 2168).unwrap(); + assert_eq!(commands.len(), 2); + + // `resetEngine` won't fit within 2084 bytes. + shrink_to_fit(&mut commands, 2084).unwrap(); + assert_eq!(commands.len(), 1); + + // `wipeEngine` won't fit at all. + shrink_to_fit(&mut commands, 1024).unwrap(); + assert!(commands.is_empty()); + } +} |