summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/clients_engine
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15/src/clients_engine')
-rw-r--r--third_party/rust/sync15/src/clients_engine/engine.rs814
-rw-r--r--third_party/rust/sync15/src/clients_engine/mod.rs93
-rw-r--r--third_party/rust/sync15/src/clients_engine/record.rs124
-rw-r--r--third_party/rust/sync15/src/clients_engine/ser.rs125
4 files changed, 1156 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/clients_engine/engine.rs b/third_party/rust/sync15/src/clients_engine/engine.rs
new file mode 100644
index 0000000000..f3d6242126
--- /dev/null
+++ b/third_party/rust/sync15/src/clients_engine/engine.rs
@@ -0,0 +1,814 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::collections::{HashMap, HashSet};
+
+use crate::bso::{IncomingKind, OutgoingBso, OutgoingEnvelope};
+use crate::client::{
+ CollState, CollectionKeys, CollectionUpdate, GlobalState, InfoConfiguration,
+ Sync15StorageClient,
+};
+use crate::client_types::{ClientData, RemoteClient};
+use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset};
+use crate::{error::Result, Guid, KeyBundle};
+use interrupt_support::Interruptee;
+
+use super::{
+ record::{ClientRecord, CommandRecord},
+ ser::shrink_to_fit,
+ Command, CommandProcessor, CommandStatus, CLIENTS_TTL,
+};
+
+const COLLECTION_NAME: &str = "clients";
+
+/// The driver for the clients engine. Internal; split out from the `Engine`
+/// struct to make testing easier.
+struct Driver<'a> {
+ command_processor: &'a dyn CommandProcessor,
+ interruptee: &'a dyn Interruptee,
+ config: &'a InfoConfiguration,
+ recent_clients: HashMap<String, RemoteClient>,
+}
+
+impl<'a> Driver<'a> {
+ fn new(
+ command_processor: &'a dyn CommandProcessor,
+ interruptee: &'a dyn Interruptee,
+ config: &'a InfoConfiguration,
+ ) -> Driver<'a> {
+ Driver {
+ command_processor,
+ interruptee,
+ config,
+ recent_clients: HashMap::new(),
+ }
+ }
+
+ fn note_recent_client(&mut self, client: &ClientRecord) {
+ self.recent_clients.insert(client.id.clone(), client.into());
+ }
+
+ fn sync(
+ &mut self,
+ inbound: IncomingChangeset,
+ should_refresh_client: bool,
+ ) -> Result<OutgoingChangeset> {
+ let mut outgoing = OutgoingChangeset::new(COLLECTION_NAME, inbound.timestamp);
+ outgoing.timestamp = inbound.timestamp;
+
+ self.interruptee.err_if_interrupted()?;
+ let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;
+
+ let mut has_own_client_record = false;
+
+ for bso in inbound.changes {
+ self.interruptee.err_if_interrupted()?;
+
+ let content = bso.into_content();
+
+ let client: ClientRecord = match content.kind {
+ IncomingKind::Malformed => {
+ log::debug!("Error unpacking record");
+ continue;
+ }
+ IncomingKind::Tombstone => {
+ log::debug!("Record has been deleted; skipping...");
+ continue;
+ }
+ IncomingKind::Content(client) => client,
+ };
+
+ if client.id == self.command_processor.settings().fxa_device_id {
+ log::debug!("Found my record on the server");
+ // If we see our own client record, apply any incoming commands,
+ // remove them from the list, and reupload the record. Any
+ // commands that we don't understand also go back in the list.
+ // https://github.com/mozilla/application-services/issues/1800
+ // tracks if that's the right thing to do.
+ has_own_client_record = true;
+ let mut current_client_record = self.current_client_record();
+ for c in &client.commands {
+ let status = match c.as_command() {
+ Some(command) => self.command_processor.apply_incoming_command(command)?,
+ None => CommandStatus::Unsupported,
+ };
+ match status {
+ CommandStatus::Applied => {}
+ CommandStatus::Ignored => {
+ log::debug!("Ignored command {:?}", c);
+ }
+ CommandStatus::Unsupported => {
+ log::warn!("Don't know how to apply command {:?}", c);
+ current_client_record.commands.push(c.clone());
+ }
+ }
+ }
+
+ // The clients collection has a hard limit on the payload size,
+ // after which the server starts rejecting our records. Large
+ // command lists can cause us to exceed this, so we truncate
+ // the list.
+ shrink_to_fit(
+ &mut current_client_record.commands,
+ self.memcache_max_record_payload_size(),
+ )?;
+
+ // Add the new client record to our map of recently synced
+ // clients, so that downstream consumers like synced tabs can
+ // access them.
+ self.note_recent_client(&current_client_record);
+
+ // We periodically upload our own client record, even if it
+ // doesn't change, to keep it fresh.
+ if should_refresh_client || client != current_client_record {
+ log::debug!("Will update our client record on the server");
+ let envelope = OutgoingEnvelope {
+ id: content.envelope.id,
+ ttl: Some(CLIENTS_TTL),
+ ..Default::default()
+ };
+ outgoing
+ .changes
+ .push(OutgoingBso::from_content(envelope, current_client_record)?);
+ }
+ } else {
+ // Add the other client to our map of recently synced clients.
+ self.note_recent_client(&client);
+
+ // Bail if we don't have any outgoing commands to write into
+ // the other client's record.
+ if outgoing_commands.is_empty() {
+ continue;
+ }
+
+ // Determine if we have new commands, that aren't already in the
+ // client's command list.
+ let current_commands: HashSet<Command> = client
+ .commands
+ .iter()
+ .filter_map(|c| c.as_command())
+ .collect();
+ let mut new_outgoing_commands = outgoing_commands
+ .difference(&current_commands)
+ .cloned()
+ .collect::<Vec<_>>();
+ // Sort, to ensure deterministic ordering for tests.
+ new_outgoing_commands.sort();
+ let mut new_client = client.clone();
+ new_client
+ .commands
+ .extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
+ if new_client.commands.len() == client.commands.len() {
+ continue;
+ }
+
+ // Hooray, we added new commands! Make sure the record still
+ // fits in the maximum record size, or the server will reject
+ // our upload.
+ shrink_to_fit(
+ &mut new_client.commands,
+ self.memcache_max_record_payload_size(),
+ )?;
+
+ let envelope = OutgoingEnvelope {
+ id: content.envelope.id,
+ ttl: Some(CLIENTS_TTL),
+ ..Default::default()
+ };
+ outgoing
+ .changes
+ .push(OutgoingBso::from_content(envelope, new_client)?);
+ }
+ }
+
+ // Upload a record for our own client, if we didn't replace it already.
+ if !has_own_client_record {
+ let current_client_record = self.current_client_record();
+ self.note_recent_client(&current_client_record);
+ let envelope = OutgoingEnvelope {
+ id: Guid::new(&current_client_record.id),
+ ttl: Some(CLIENTS_TTL),
+ ..Default::default()
+ };
+ outgoing
+ .changes
+ .push(OutgoingBso::from_content(envelope, current_client_record)?);
+ }
+
+ Ok(outgoing)
+ }
+
+ /// Builds a fresh client record for this device.
+ fn current_client_record(&self) -> ClientRecord {
+ let settings = self.command_processor.settings();
+ ClientRecord {
+ id: settings.fxa_device_id.clone(),
+ name: settings.device_name.clone(),
+ typ: settings.device_type.into(),
+ commands: Vec::new(),
+ fxa_device_id: Some(settings.fxa_device_id.clone()),
+ version: None,
+ protocols: vec!["1.5".into()],
+ form_factor: None,
+ os: None,
+ app_package: None,
+ application: None,
+ device: None,
+ }
+ }
+
+ fn max_record_payload_size(&self) -> usize {
+ let payload_max = self.config.max_record_payload_bytes;
+ if payload_max <= self.config.max_post_bytes {
+ self.config.max_post_bytes.saturating_sub(4096)
+ } else {
+ payload_max
+ }
+ }
+
+ /// Collections stored in memcached ("tabs", "clients" or "meta") have a
+ /// different max size than ones stored in the normal storage server db.
+ /// In practice, the real limit here is 1M (bug 1300451 comment 40), but
+ /// there's overhead involved that is hard to calculate on the client, so we
+ /// use 512k to be safe (at the recommendation of the server team). Note
+ /// that if the server reports a lower limit (via info/configuration), we
+ /// respect that limit instead. See also bug 1403052.
+ /// XXX - the above comment is stale and refers to the world before the
+ /// move to spanner and the rust sync server.
+ fn memcache_max_record_payload_size(&self) -> usize {
+ self.max_record_payload_size().min(512 * 1024)
+ }
+}
+
+pub struct Engine<'a> {
+ pub command_processor: &'a dyn CommandProcessor,
+ pub interruptee: &'a dyn Interruptee,
+ pub recent_clients: HashMap<String, RemoteClient>,
+}
+
+impl<'a> Engine<'a> {
+ /// Creates a new clients engine that delegates to the given command
+ /// processor to apply incoming commands.
+ pub fn new<'b>(
+ command_processor: &'b dyn CommandProcessor,
+ interruptee: &'b dyn Interruptee,
+ ) -> Engine<'b> {
+ Engine {
+ command_processor,
+ interruptee,
+ recent_clients: HashMap::new(),
+ }
+ }
+
+ /// Syncs the clients collection. This works a little differently than
+ /// other collections:
+ ///
+ /// 1. It can't be disabled or declined.
+ /// 2. The sync ID and last sync time aren't meaningful, since we always
+ /// fetch all client records on every sync. As such, the
+ /// `LocalCollStateMachine` that we use for other engines doesn't
+ /// apply to it.
+ /// 3. It doesn't persist state directly, but relies on the sync manager
+ /// to persist device settings, and process commands.
+ /// 4. Failing to sync the clients collection is fatal, and aborts the
+ /// sync.
+ ///
+ /// For these reasons, we implement this engine directly in the `sync15`
+ /// crate, and provide a specialized `sync` method instead of implementing
+ /// `sync15::Store`.
+ pub fn sync(
+ &mut self,
+ storage_client: &Sync15StorageClient,
+ global_state: &GlobalState,
+ root_sync_key: &KeyBundle,
+ should_refresh_client: bool,
+ ) -> Result<()> {
+ log::info!("Syncing collection clients");
+
+ let coll_keys = CollectionKeys::from_encrypted_payload(
+ global_state.keys.clone(),
+ global_state.keys_timestamp,
+ root_sync_key,
+ )?;
+ let mut coll_state = CollState {
+ config: global_state.config.clone(),
+ last_modified: global_state
+ .collections
+ .get(COLLECTION_NAME)
+ .cloned()
+ .unwrap_or_default(),
+ key: coll_keys.key_for_collection(COLLECTION_NAME).clone(),
+ };
+
+ let inbound = self.fetch_incoming(storage_client, &mut coll_state)?;
+
+ let mut driver = Driver::new(
+ self.command_processor,
+ self.interruptee,
+ &global_state.config,
+ );
+
+ let outgoing = driver.sync(inbound, should_refresh_client)?;
+ self.recent_clients = driver.recent_clients;
+
+ coll_state.last_modified = outgoing.timestamp;
+
+ self.interruptee.err_if_interrupted()?;
+ let upload_info =
+ CollectionUpdate::new_from_changeset(storage_client, &coll_state, outgoing, true)?
+ .upload()?;
+
+ log::info!(
+ "Upload success ({} records success, {} records failed)",
+ upload_info.successful_ids.len(),
+ upload_info.failed_ids.len()
+ );
+
+ log::info!("Finished syncing clients");
+ Ok(())
+ }
+
+ fn fetch_incoming(
+ &self,
+ storage_client: &Sync15StorageClient,
+ coll_state: &mut CollState,
+ ) -> Result<IncomingChangeset> {
+ // Note that, unlike other stores, we always fetch the full collection
+ // on every sync, so `inbound` will return all clients, not just the
+ // ones that changed since the last sync.
+ let coll_request = CollectionRequest::new(COLLECTION_NAME).full();
+
+ self.interruptee.err_if_interrupted()?;
+ let inbound = crate::client::fetch_incoming(storage_client, coll_state, &coll_request)?;
+
+ Ok(inbound)
+ }
+
+ pub fn local_client_id(&self) -> String {
+ // Bit dirty but it's the easiest way to reach to our own
+ // device ID without refactoring the whole sync manager crate.
+ self.command_processor.settings().fxa_device_id.clone()
+ }
+
+ pub fn get_client_data(&self) -> ClientData {
+ ClientData {
+ local_client_id: self.local_client_id(),
+ recent_clients: self.recent_clients.clone(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::{CommandStatus, DeviceType, Settings};
+ use super::*;
+ use crate::bso::IncomingBso;
+ use crate::ServerTimestamp;
+ use anyhow::Result;
+ use interrupt_support::NeverInterrupts;
+ use serde_json::{json, Value};
+ use std::iter::zip;
+
+ struct TestProcessor {
+ settings: Settings,
+ outgoing_commands: HashSet<Command>,
+ }
+
+ impl CommandProcessor for TestProcessor {
+ fn settings(&self) -> &Settings {
+ &self.settings
+ }
+
+ fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> {
+ Ok(if let Command::Reset(name) = command {
+ if name == "forms" {
+ CommandStatus::Unsupported
+ } else {
+ CommandStatus::Applied
+ }
+ } else {
+ CommandStatus::Ignored
+ })
+ }
+
+ fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> {
+ Ok(self.outgoing_commands.clone())
+ }
+ }
+
+ fn inbound_from_clients(clients: Value) -> IncomingChangeset {
+ if let Value::Array(clients) = clients {
+ let changes = clients
+ .into_iter()
+ .map(IncomingBso::from_test_content)
+ .collect();
+ IncomingChangeset {
+ changes,
+ timestamp: ServerTimestamp(0),
+ collection: COLLECTION_NAME.into(),
+ }
+ } else {
+ unreachable!("`clients` must be an array of client records")
+ }
+ }
+
+ #[test]
+ fn test_clients_sync() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: [
+ Command::Wipe("bookmarks".into()),
+ Command::Reset("history".into()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let inbound = inbound_from_clients(json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }, {
+ "id": "deviceCCCCCC",
+ "name": "Fenix",
+ "type": "mobile",
+ "commands": [],
+ "fxaDeviceId": "deviceCCCCCC",
+ }, {
+ "id": "deviceAAAAAA",
+ "name": "Laptop with a different name",
+ "type": "desktop",
+ "commands": [{
+ "command": "wipeEngine",
+ "args": ["logins"]
+ }, {
+ "command": "displayURI",
+ "args": ["http://example.com", "Fennec", "Example page"],
+ "flowID": "flooooooooow",
+ }, {
+ "command": "resetEngine",
+ "args": ["forms"],
+ }, {
+ "command": "logout",
+ "args": [],
+ }],
+ "fxaDeviceId": "deviceAAAAAA",
+ }]));
+
+ // Passing false for `should_refresh_client` - it should be ignored
+ // because we've changed the commands.
+ let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
+ outgoing
+ .changes
+ .sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
+
+ // Make sure the list of recently synced remote clients is correct.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ let expected_remote_clients = &[
+ RemoteClient {
+ fxa_device_id: Some("deviceAAAAAA".to_string()),
+ device_name: "Laptop".into(),
+ device_type: Some(DeviceType::Desktop),
+ },
+ RemoteClient {
+ fxa_device_id: Some("iPhooooooone".to_string()),
+ device_name: "iPhone".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ RemoteClient {
+ fxa_device_id: Some("deviceCCCCCC".to_string()),
+ device_name: "Fenix".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ ];
+ let actual_remote_clients = expected_ids
+ .iter()
+ .filter_map(|&id| driver.recent_clients.get(id))
+ .cloned()
+ .collect::<Vec<RemoteClient>>();
+ assert_eq!(actual_remote_clients, expected_remote_clients);
+
+ let expected = json!([{
+ "id": "deviceAAAAAA",
+ "name": "Laptop",
+ "type": "desktop",
+ "commands": [{
+ "command": "displayURI",
+ "args": ["http://example.com", "Fennec", "Example page"],
+ "flowID": "flooooooooow",
+ }, {
+ "command": "resetEngine",
+ "args": ["forms"],
+ }, {
+ "command": "logout",
+ "args": [],
+ }],
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ }, {
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }, {
+ "command": "wipeEngine",
+ "args": ["bookmarks"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }, {
+ "id": "deviceCCCCCC",
+ "name": "Fenix",
+ "type": "mobile",
+ "commands": [{
+ "command": "wipeEngine",
+ "args": ["bookmarks"],
+ }, {
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "deviceCCCCCC",
+ }]);
+ // turn outgoing into an incoming payload.
+ let incoming = IncomingChangeset {
+ changes: outgoing
+ .changes
+ .into_iter()
+ .map(|c| OutgoingBso::to_test_incoming(&c))
+ .collect(),
+ timestamp: outgoing.timestamp,
+ collection: outgoing.collection,
+ };
+ if let Value::Array(expected) = expected {
+ for (incoming_cleartext, exp_client) in zip(incoming.changes, expected) {
+ let incoming_client: ClientRecord =
+ incoming_cleartext.into_content().content().unwrap();
+ assert_eq!(incoming_client, serde_json::from_value(exp_client).unwrap());
+ }
+ } else {
+ unreachable!("`expected_clients` must be an array of client records")
+ }
+ }
+
+ #[test]
+ fn test_clients_sync_bad_incoming_record_skipped() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: [].iter().cloned().collect(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let inbound = inbound_from_clients(json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }, {
+ "id": "garbage",
+ "garbage": "value",
+ }, {
+ "id": "deviceCCCCCC",
+ "deleted": true,
+ "name": "Fenix",
+ "type": "mobile",
+ "commands": [],
+ "fxaDeviceId": "deviceCCCCCC",
+ }]));
+
+ driver.sync(inbound, false).expect("Should sync clients");
+
+ // Make sure the list of recently synced remote clients is correct.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ let expected_remote_clients = &[
+ RemoteClient {
+ fxa_device_id: Some("deviceAAAAAA".to_string()),
+ device_name: "Laptop".into(),
+ device_type: Some(DeviceType::Desktop),
+ },
+ RemoteClient {
+ fxa_device_id: Some("iPhooooooone".to_string()),
+ device_name: "iPhone".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ ];
+ let actual_remote_clients = expected_ids
+ .iter()
+ .filter_map(|&id| driver.recent_clients.get(id))
+ .cloned()
+ .collect::<Vec<RemoteClient>>();
+ assert_eq!(actual_remote_clients, expected_remote_clients);
+ }
+
+ #[test]
+ fn test_clients_sync_explicit_refresh() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: [].iter().cloned().collect(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let test_clients = json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }, {
+ "id": "deviceAAAAAA",
+ "name": "Laptop",
+ "type": "desktop",
+ "commands": [],
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ }]);
+
+ let outgoing = driver
+ .sync(inbound_from_clients(test_clients.clone()), false)
+ .expect("Should sync clients");
+ // should be no outgoing changes.
+ assert_eq!(outgoing.changes.len(), 0);
+
+ // Make sure the list of recently synced remote clients is correct and
+ // still includes our record we didn't update.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ // Do it again - still no changes, but force a refresh.
+ let outgoing = driver
+ .sync(inbound_from_clients(test_clients), true)
+ .expect("Should sync clients");
+ assert_eq!(outgoing.changes.len(), 1);
+
+ // Do it again - but this time with our own client record needing
+ // some change.
+ let inbound = inbound_from_clients(json!([{
+ "id": "deviceAAAAAA",
+ "name": "Laptop with New Name",
+ "type": "desktop",
+ "commands": [],
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ }]));
+ let outgoing = driver.sync(inbound, false).expect("Should sync clients");
+ // should still be outgoing because the name changed.
+ assert_eq!(outgoing.changes.len(), 1);
+ }
+
+ #[test]
+ fn test_fresh_client_record() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: HashSet::new(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let clients = json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }]);
+
+ let inbound = if let Value::Array(clients) = clients {
+ let changes = clients
+ .into_iter()
+ .map(IncomingBso::from_test_content)
+ .collect();
+ IncomingChangeset {
+ changes,
+ timestamp: ServerTimestamp(0),
+ collection: COLLECTION_NAME.into(),
+ }
+ } else {
+ unreachable!("`clients` must be an array of client records")
+ };
+
+ // Passing false here for should_refresh_client, but it should be
+ // ignored as we don't have an existing record yet.
+ let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
+ outgoing
+ .changes
+ .sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
+
+ // Make sure the list of recently synced remote clients is correct.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ let expected_remote_clients = &[
+ RemoteClient {
+ fxa_device_id: Some("deviceAAAAAA".to_string()),
+ device_name: "Laptop".into(),
+ device_type: Some(DeviceType::Desktop),
+ },
+ RemoteClient {
+ fxa_device_id: Some("iPhooooooone".to_string()),
+ device_name: "iPhone".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ ];
+ let actual_remote_clients = expected_ids
+ .iter()
+ .filter_map(|&id| driver.recent_clients.get(id))
+ .cloned()
+ .collect::<Vec<RemoteClient>>();
+ assert_eq!(actual_remote_clients, expected_remote_clients);
+
+ let expected = json!([{
+ "id": "deviceAAAAAA",
+ "name": "Laptop",
+ "type": "desktop",
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ "ttl": CLIENTS_TTL,
+ }]);
+ if let Value::Array(expected) = expected {
+ // turn outgoing into an incoming payload.
+ let incoming = IncomingChangeset {
+ changes: outgoing
+ .changes
+ .into_iter()
+ .map(|c| OutgoingBso::to_test_incoming(&c))
+ .collect(),
+ timestamp: outgoing.timestamp,
+ collection: outgoing.collection,
+ };
+ for (incoming_cleartext, record) in zip(incoming.changes, expected) {
+ let incoming_client: ClientRecord =
+ incoming_cleartext.into_content().content().unwrap();
+ assert_eq!(incoming_client, serde_json::from_value(record).unwrap());
+ }
+ } else {
+ unreachable!("`expected_clients` must be an array of client records")
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/clients_engine/mod.rs b/third_party/rust/sync15/src/clients_engine/mod.rs
new file mode 100644
index 0000000000..7346712dc7
--- /dev/null
+++ b/third_party/rust/sync15/src/clients_engine/mod.rs
@@ -0,0 +1,93 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! The client engine is a [crate::engine](Sync Engine) used to manage the
+//! "clients" collection. The clients engine manages the client record for
+//! "this device, and also manages "commands".
+//! In short, commands target one or more engines and instruct them to
+//! perform various operations - such as wiping all local data.
+//! These commands are used very rarely - currently the only command used
+//! in practice is for bookmarks to wipe all their data, which is sent when
+//! a desktop device restores all bookmarks from a backup. In this scenario,
+//! desktop will delete all local bookmarks then replace them with the backed
+//! up set, which without a "wipe" command would almost certainly cause other
+//! connected devices to "resurrect" the deleted bookmarks.
+use std::collections::HashSet;
+
+mod engine;
+mod record;
+mod ser;
+
+use crate::DeviceType;
+use anyhow::Result;
+pub use engine::Engine;
+
+// These are what desktop uses.
+const CLIENTS_TTL: u32 = 15_552_000; // 180 days
+pub(crate) const CLIENTS_TTL_REFRESH: u64 = 604_800; // 7 days
+
+/// A command processor applies incoming commands like wipes and resets for all
+/// stores, and returns commands to send to other clients. It also manages
+/// settings like the device name and type, which is stored in the special
+/// `clients` collection.
+///
+/// In practice, this trait only has one implementation, in the sync manager.
+/// It's split this way because the clients engine depends on internal `sync15`
+/// structures, and can't be implemented as a syncable store...but `sync15`
+/// doesn't know anything about multiple engines. This lets the sync manager
+/// provide its own implementation for handling wipe and reset commands for all
+/// the engines that it manages.
+pub trait CommandProcessor {
+ fn settings(&self) -> &Settings;
+
+ /// Fetches commands to send to other clients. An error return value means
+ /// commands couldn't be fetched, and halts the sync.
+ fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>>;
+
+ /// Applies a command sent to this client from another client. This method
+ /// should return a `CommandStatus` indicating whether the command was
+ /// processed.
+ ///
+ /// An error return value means the sync manager encountered an error
+ /// applying the command, and halts the sync to prevent unexpected behavior
+ /// (for example, merging local and remote bookmarks, when we were told to
+ /// wipe our local bookmarks).
+ fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus>;
+}
+
+/// Indicates if a command was applied successfully, ignored, or not supported.
+/// Applied and ignored commands are removed from our client record, and never
+/// retried. Unsupported commands are put back into our record, and retried on
+/// subsequent syncs. This is to handle clients adding support for new data
+/// types.
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+pub enum CommandStatus {
+ Applied,
+ Ignored,
+ Unsupported,
+}
+
+/// Information about this device to include in its client record. This should
+/// be persisted across syncs, as part of the sync manager state.
+#[derive(Clone, Debug, Eq, Hash, PartialEq)]
+pub struct Settings {
+ /// The FxA device ID of this client, also used as this client's record ID
+ /// in the clients collection.
+ pub fxa_device_id: String,
+ /// The name of this client. This should match the client's name in the
+ /// FxA device manager.
+ pub device_name: String,
+ /// The type of this client: mobile, tablet, desktop, or other.
+ pub device_type: DeviceType,
+}
+
+#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
+pub enum Command {
+ /// Erases all local data for a specific engine.
+ Wipe(String),
+ /// Resets local sync state for all engines.
+ ResetAll,
+ /// Resets local sync state for a specific engine.
+ Reset(String),
+}
diff --git a/third_party/rust/sync15/src/clients_engine/record.rs b/third_party/rust/sync15/src/clients_engine/record.rs
new file mode 100644
index 0000000000..12de36d82f
--- /dev/null
+++ b/third_party/rust/sync15/src/clients_engine/record.rs
@@ -0,0 +1,124 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use serde_derive::*;
+
+use super::Command;
+
+/// The serialized form of a client record.
+#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ClientRecord {
+ #[serde(rename = "id")]
+ pub id: String,
+
+ pub name: String,
+
+ #[serde(default, rename = "type")]
+ pub typ: Option<crate::DeviceType>,
+
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub commands: Vec<CommandRecord>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub fxa_device_id: Option<String>,
+
+ /// `version`, `protocols`, `formfactor`, `os`, `appPackage`, `application`,
+ /// and `device` are unused and optional in all implementations (Desktop,
+ /// iOS, and Fennec), but we round-trip them.
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub version: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub protocols: Vec<String>,
+
+ #[serde(
+ default,
+ rename = "formfactor",
+ skip_serializing_if = "Option::is_none"
+ )]
+ pub form_factor: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub os: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub app_package: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub application: Option<String>,
+
+ /// The model of the device, like "iPhone" or "iPod touch" on iOS. Note
+ /// that this is _not_ the client ID (`id`) or the FxA device ID
+ /// (`fxa_device_id`).
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub device: Option<String>,
+}
+
+impl From<&ClientRecord> for crate::RemoteClient {
+ fn from(record: &ClientRecord) -> crate::RemoteClient {
+ crate::RemoteClient {
+ fxa_device_id: record.fxa_device_id.clone(),
+ device_name: record.name.clone(),
+ device_type: record.typ,
+ }
+ }
+}
+
+/// The serialized form of a client command.
+#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CommandRecord {
+ /// The command name. This is a string, not an enum, because we want to
+ /// round-trip commands that we don't support yet.
+ #[serde(rename = "command")]
+ pub name: String,
+
+ /// Extra, command-specific arguments. Note that we must send an empty
+ /// array if the command expects no arguments.
+ #[serde(default)]
+ pub args: Vec<String>,
+
+ /// Some commands, like repair, send a "flow ID" that other cliennts can
+ /// record in their telemetry. We don't currently send commands with
+ /// flow IDs, but we round-trip them.
+ #[serde(default, rename = "flowID", skip_serializing_if = "Option::is_none")]
+ pub flow_id: Option<String>,
+}
+
+impl CommandRecord {
+ /// Converts a serialized command into one that we can apply. Returns `None`
+ /// if we don't support the command.
+ pub fn as_command(&self) -> Option<Command> {
+ match self.name.as_str() {
+ "wipeEngine" => self.args.get(0).map(|e| Command::Wipe(e.into())),
+ "resetEngine" => self.args.get(0).map(|e| Command::Reset(e.into())),
+ "resetAll" => Some(Command::ResetAll),
+ _ => None,
+ }
+ }
+}
+
+impl From<Command> for CommandRecord {
+ fn from(command: Command) -> CommandRecord {
+ match command {
+ Command::Wipe(engine) => CommandRecord {
+ name: "wipeEngine".into(),
+ args: vec![engine],
+ flow_id: None,
+ },
+ Command::Reset(engine) => CommandRecord {
+ name: "resetEngine".into(),
+ args: vec![engine],
+ flow_id: None,
+ },
+ Command::ResetAll => CommandRecord {
+ name: "resetAll".into(),
+ args: Vec::new(),
+ flow_id: None,
+ },
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/clients_engine/ser.rs b/third_party/rust/sync15/src/clients_engine/ser.rs
new file mode 100644
index 0000000000..2e7b0817b8
--- /dev/null
+++ b/third_party/rust/sync15/src/clients_engine/ser.rs
@@ -0,0 +1,125 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::Result;
+use serde::Serialize;
+use std::io::{self, Write};
+
+/// A writer that counts the number of bytes it's asked to write, and discards
+/// the data. Used to calculate the serialized size of the commands list.
+#[derive(Clone, Copy, Default)]
+pub struct WriteCount(usize);
+
+impl WriteCount {
+ #[inline]
+ pub fn len(self) -> usize {
+ self.0
+ }
+}
+
+impl Write for WriteCount {
+ #[inline]
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0 += buf.len();
+ Ok(buf.len())
+ }
+
+ #[inline]
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+/// Returns the size of the given value, in bytes, when serialized to JSON.
+fn compute_serialized_size<T: Serialize>(value: &T) -> Result<usize> {
+ let mut w = WriteCount::default();
+ serde_json::to_writer(&mut w, value)?;
+ Ok(w.len())
+}
+
+/// Truncates `list` to fit within `payload_size_max_bytes` when serialized to
+/// JSON.
+pub fn shrink_to_fit<T: Serialize>(list: &mut Vec<T>, payload_size_max_bytes: usize) -> Result<()> {
+ let size = compute_serialized_size(&list)?;
+ // See bug 535326 comment 8 for an explanation of the estimation
+ match ((payload_size_max_bytes / 4) * 3).checked_sub(1500) {
+ Some(max_serialized_size) => {
+ if size > max_serialized_size {
+ // Estimate a little more than the direct fraction to maximize packing
+ let cutoff = (list.len() * max_serialized_size - 1) / size + 1;
+ list.truncate(cutoff + 1);
+ // Keep dropping off the last entry until the data fits.
+ while compute_serialized_size(&list)? > max_serialized_size {
+ if list.pop().is_none() {
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+ None => {
+ list.clear();
+ Ok(())
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::record::CommandRecord;
+ use super::*;
+
+ #[test]
+ fn test_compute_serialized_size() {
+ assert_eq!(compute_serialized_size(&1).unwrap(), 1);
+ assert_eq!(compute_serialized_size(&"hi").unwrap(), 4);
+ assert_eq!(
+ compute_serialized_size(&["hi", "hello", "bye"]).unwrap(),
+ 20
+ );
+ }
+
+ #[test]
+ fn test_shrink_to_fit() {
+ let mut commands = vec![
+ CommandRecord {
+ name: "wipeEngine".into(),
+ args: vec!["bookmarks".into()],
+ flow_id: Some("flow".into()),
+ },
+ CommandRecord {
+ name: "resetEngine".into(),
+ args: vec!["history".into()],
+ flow_id: Some("flow".into()),
+ },
+ CommandRecord {
+ name: "logout".into(),
+ args: Vec::new(),
+ flow_id: None,
+ },
+ ];
+
+ // 4096 bytes is enough to fit all three commands.
+ shrink_to_fit(&mut commands, 4096).unwrap();
+ assert_eq!(commands.len(), 3);
+
+ let sizes = commands
+ .iter()
+ .map(|c| compute_serialized_size(c).unwrap())
+ .collect::<Vec<_>>();
+ assert_eq!(sizes, &[61, 60, 30]);
+
+ // `logout` won't fit within 2168 bytes.
+ shrink_to_fit(&mut commands, 2168).unwrap();
+ assert_eq!(commands.len(), 2);
+
+ // `resetEngine` won't fit within 2084 bytes.
+ shrink_to_fit(&mut commands, 2084).unwrap();
+ assert_eq!(commands.len(), 1);
+
+ // `wipeEngine` won't fit at all.
+ shrink_to_fit(&mut commands, 1024).unwrap();
+ assert!(commands.is_empty());
+ }
+}