diff options
Diffstat (limited to '')
-rw-r--r-- | services/sync/modules/engines.sys.mjs | 2260 | ||||
-rw-r--r-- | services/sync/modules/engines/addons.sys.mjs | 820 | ||||
-rw-r--r-- | services/sync/modules/engines/bookmarks.sys.mjs | 953 | ||||
-rw-r--r-- | services/sync/modules/engines/clients.sys.mjs | 1123 | ||||
-rw-r--r-- | services/sync/modules/engines/extension-storage.sys.mjs | 303 | ||||
-rw-r--r-- | services/sync/modules/engines/forms.sys.mjs | 298 | ||||
-rw-r--r-- | services/sync/modules/engines/history.sys.mjs | 585 | ||||
-rw-r--r-- | services/sync/modules/engines/passwords.sys.mjs | 565 | ||||
-rw-r--r-- | services/sync/modules/engines/prefs.sys.mjs | 467 | ||||
-rw-r--r-- | services/sync/modules/engines/tabs.sys.mjs | 624 |
10 files changed, 7998 insertions, 0 deletions
diff --git a/services/sync/modules/engines.sys.mjs b/services/sync/modules/engines.sys.mjs new file mode 100644 index 0000000000..dc3fa185b2 --- /dev/null +++ b/services/sync/modules/engines.sys.mjs @@ -0,0 +1,2260 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; + +import { JSONFile } from "resource://gre/modules/JSONFile.sys.mjs"; +import { Log } from "resource://gre/modules/Log.sys.mjs"; + +import { Async } from "resource://services-common/async.sys.mjs"; +import { Observers } from "resource://services-common/observers.sys.mjs"; + +import { + DEFAULT_DOWNLOAD_BATCH_SIZE, + DEFAULT_GUID_FETCH_BATCH_SIZE, + ENGINE_BATCH_INTERRUPTED, + ENGINE_DOWNLOAD_FAIL, + ENGINE_UPLOAD_FAIL, + VERSION_OUT_OF_DATE, +} from "resource://services-sync/constants.sys.mjs"; + +import { + Collection, + CryptoWrapper, +} from "resource://services-sync/record.sys.mjs"; +import { Resource } from "resource://services-sync/resource.sys.mjs"; +import { + SerializableSet, + Svc, + Utils, +} from "resource://services-sync/util.sys.mjs"; +import { SyncedRecordsTelemetry } from "resource://services-sync/telemetry.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", +}); + +function ensureDirectory(path) { + return IOUtils.makeDirectory(PathUtils.parent(path), { + createAncestors: true, + }); +} + +/** + * Trackers are associated with a single engine and deal with + * listening for changes to their particular data type. + * + * The base `Tracker` only supports listening for changes, and bumping the score + * to indicate how urgently the engine wants to sync. It does not persist any + * data. Engines that track changes directly in the storage layer (like + * bookmarks, bridged engines, addresses, and credit cards) or only upload a + * single record (tabs and preferences) should subclass `Tracker`. + */ +export function Tracker(name, engine) { + if (!engine) { + throw new Error("Tracker must be associated with an Engine instance."); + } + + name = name || "Unnamed"; + this.name = name.toLowerCase(); + this.engine = engine; + + this._log = Log.repository.getLogger(`Sync.Engine.${name}.Tracker`); + + this._score = 0; + + this.asyncObserver = Async.asyncObserver(this, this._log); +} + +Tracker.prototype = { + // New-style trackers use change sources to filter out changes made by Sync in + // observer notifications, so we don't want to let the engine ignore all + // changes during a sync. + get ignoreAll() { + return false; + }, + + // Define an empty setter so that the engine doesn't throw a `TypeError` + // setting a read-only property. + set ignoreAll(value) {}, + + /* + * Score can be called as often as desired to decide which engines to sync + * + * Valid values for score: + * -1: Do not sync unless the user specifically requests it (almost disabled) + * 0: Nothing has changed + * 100: Please sync me ASAP! + * + * Setting it to other values should (but doesn't currently) throw an exception + */ + get score() { + return this._score; + }, + + set score(value) { + this._score = value; + Observers.notify("weave:engine:score:updated", this.name); + }, + + // Should be called by service everytime a sync has been done for an engine + resetScore() { + this._score = 0; + }, + + // Unsupported, and throws a more descriptive error to ensure callers aren't + // accidentally using persistence. + async getChangedIDs() { + throw new TypeError("This tracker doesn't store changed IDs"); + }, + + // Also unsupported. + async addChangedID(id, when) { + throw new TypeError("Can't add changed ID to this tracker"); + }, + + // Ditto. + async removeChangedID(...ids) { + throw new TypeError("Can't remove changed IDs from this tracker"); + }, + + // This method is called at various times, so we override with a no-op + // instead of throwing. + clearChangedIDs() {}, + + _now() { + return Date.now() / 1000; + }, + + _isTracking: false, + + start() { + if (!this.engineIsEnabled()) { + return; + } + this._log.trace("start()."); + if (!this._isTracking) { + this.onStart(); + this._isTracking = true; + } + }, + + async stop() { + this._log.trace("stop()."); + if (this._isTracking) { + await this.asyncObserver.promiseObserversComplete(); + this.onStop(); + this._isTracking = false; + } + }, + + // Override these in your subclasses. + onStart() {}, + onStop() {}, + async observe(subject, topic, data) {}, + + engineIsEnabled() { + if (!this.engine) { + // Can't tell -- we must be running in a test! + return true; + } + return this.engine.enabled; + }, + + /** + * Starts or stops listening for changes depending on the associated engine's + * enabled state. + * + * @param {Boolean} engineEnabled Whether the engine was enabled. + */ + async onEngineEnabledChanged(engineEnabled) { + if (engineEnabled == this._isTracking) { + return; + } + + if (engineEnabled) { + this.start(); + } else { + await this.stop(); + this.clearChangedIDs(); + } + }, + + async finalize() { + await this.stop(); + }, +}; + +/* + * A tracker that persists a list of IDs for all changed items that need to be + * synced. This is 🚨 _extremely deprecated_ 🚨 and only kept around for current + * engines. ⚠️ Please **don't use it** for new engines! ⚠️ + * + * Why is this kind of external change tracking deprecated? Because it causes + * consistency issues due to missed notifications, interrupted syncs, and the + * tracker's view of what changed diverging from the data store's. + */ +export function LegacyTracker(name, engine) { + Tracker.call(this, name, engine); + + this._ignored = []; + this.file = this.name; + this._storage = new JSONFile({ + path: Utils.jsonFilePath("changes", this.file), + dataPostProcessor: json => this._dataPostProcessor(json), + beforeSave: () => this._beforeSave(), + }); + this._ignoreAll = false; +} + +LegacyTracker.prototype = { + get ignoreAll() { + return this._ignoreAll; + }, + + set ignoreAll(value) { + this._ignoreAll = value; + }, + + // Default to an empty object if the file doesn't exist. + _dataPostProcessor(json) { + return (typeof json == "object" && json) || {}; + }, + + // Ensure the Weave storage directory exists before writing the file. + _beforeSave() { + return ensureDirectory(this._storage.path); + }, + + async getChangedIDs() { + await this._storage.load(); + return this._storage.data; + }, + + _saveChangedIDs() { + this._storage.saveSoon(); + }, + + // ignore/unignore specific IDs. Useful for ignoring items that are + // being processed, or that shouldn't be synced. + // But note: not persisted to disk + + ignoreID(id) { + this.unignoreID(id); + this._ignored.push(id); + }, + + unignoreID(id) { + let index = this._ignored.indexOf(id); + if (index != -1) { + this._ignored.splice(index, 1); + } + }, + + async _saveChangedID(id, when) { + this._log.trace(`Adding changed ID: ${id}, ${JSON.stringify(when)}`); + const changedIDs = await this.getChangedIDs(); + changedIDs[id] = when; + this._saveChangedIDs(); + }, + + async addChangedID(id, when) { + if (!id) { + this._log.warn("Attempted to add undefined ID to tracker"); + return false; + } + + if (this.ignoreAll || this._ignored.includes(id)) { + return false; + } + + // Default to the current time in seconds if no time is provided. + if (when == null) { + when = this._now(); + } + + const changedIDs = await this.getChangedIDs(); + // Add/update the entry if we have a newer time. + if ((changedIDs[id] || -Infinity) < when) { + await this._saveChangedID(id, when); + } + + return true; + }, + + async removeChangedID(...ids) { + if (!ids.length || this.ignoreAll) { + return false; + } + for (let id of ids) { + if (!id) { + this._log.warn("Attempted to remove undefined ID from tracker"); + continue; + } + if (this._ignored.includes(id)) { + this._log.debug(`Not removing ignored ID ${id} from tracker`); + continue; + } + const changedIDs = await this.getChangedIDs(); + if (changedIDs[id] != null) { + this._log.trace("Removing changed ID " + id); + delete changedIDs[id]; + } + } + this._saveChangedIDs(); + return true; + }, + + clearChangedIDs() { + this._log.trace("Clearing changed ID list"); + this._storage.data = {}; + this._saveChangedIDs(); + }, + + async finalize() { + // Persist all pending tracked changes to disk, and wait for the final write + // to finish. + await super.finalize(); + this._saveChangedIDs(); + await this._storage.finalize(); + }, +}; +Object.setPrototypeOf(LegacyTracker.prototype, Tracker.prototype); + +/** + * The Store serves as the interface between Sync and stored data. + * + * The name "store" is slightly a misnomer because it doesn't actually "store" + * anything. Instead, it serves as a gateway to something that actually does + * the "storing." + * + * The store is responsible for record management inside an engine. It tells + * Sync what items are available for Sync, converts items to and from Sync's + * record format, and applies records from Sync into changes on the underlying + * store. + * + * Store implementations require a number of functions to be implemented. These + * are all documented below. + * + * For stores that deal with many records or which have expensive store access + * routines, it is highly recommended to implement a custom applyIncomingBatch + * and/or applyIncoming function on top of the basic APIs. + */ + +export function Store(name, engine) { + if (!engine) { + throw new Error("Store must be associated with an Engine instance."); + } + + name = name || "Unnamed"; + this.name = name.toLowerCase(); + this.engine = engine; + + this._log = Log.repository.getLogger(`Sync.Engine.${name}.Store`); + + XPCOMUtils.defineLazyGetter(this, "_timer", function () { + return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); + }); +} + +Store.prototype = { + /** + * Apply multiple incoming records against the store. + * + * This is called with a set of incoming records to process. The function + * should look at each record, reconcile with the current local state, and + * make the local changes required to bring its state in alignment with the + * record. + * + * The default implementation simply iterates over all records and calls + * applyIncoming(). Store implementations may overwrite this function + * if desired. + * + * @param records Array of records to apply + * @param a SyncedRecordsTelemetry obj that will keep track of failed reasons + * @return Array of record IDs which did not apply cleanly + */ + async applyIncomingBatch(records, countTelemetry) { + let failed = []; + + await Async.yieldingForEach(records, async record => { + try { + await this.applyIncoming(record); + } catch (ex) { + if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) { + // This kind of exception should have a 'cause' attribute, which is an + // originating exception. + // ex.cause will carry its stack with it when rethrown. + throw ex.cause; + } + if (Async.isShutdownException(ex)) { + throw ex; + } + this._log.warn("Failed to apply incoming record " + record.id, ex); + failed.push(record.id); + countTelemetry.addIncomingFailedReason(ex.message); + } + }); + + return failed; + }, + + /** + * Apply a single record against the store. + * + * This takes a single record and makes the local changes required so the + * local state matches what's in the record. + * + * The default implementation calls one of remove(), create(), or update() + * depending on the state obtained from the store itself. Store + * implementations may overwrite this function if desired. + * + * @param record + * Record to apply + */ + async applyIncoming(record) { + if (record.deleted) { + await this.remove(record); + } else if (!(await this.itemExists(record.id))) { + await this.create(record); + } else { + await this.update(record); + } + }, + + // override these in derived objects + + /** + * Create an item in the store from a record. + * + * This is called by the default implementation of applyIncoming(). If using + * applyIncomingBatch(), this won't be called unless your store calls it. + * + * @param record + * The store record to create an item from + */ + async create(record) { + throw new Error("override create in a subclass"); + }, + + /** + * Remove an item in the store from a record. + * + * This is called by the default implementation of applyIncoming(). If using + * applyIncomingBatch(), this won't be called unless your store calls it. + * + * @param record + * The store record to delete an item from + */ + async remove(record) { + throw new Error("override remove in a subclass"); + }, + + /** + * Update an item from a record. + * + * This is called by the default implementation of applyIncoming(). If using + * applyIncomingBatch(), this won't be called unless your store calls it. + * + * @param record + * The record to use to update an item from + */ + async update(record) { + throw new Error("override update in a subclass"); + }, + + /** + * Determine whether a record with the specified ID exists. + * + * Takes a string record ID and returns a booleans saying whether the record + * exists. + * + * @param id + * string record ID + * @return boolean indicating whether record exists locally + */ + async itemExists(id) { + throw new Error("override itemExists in a subclass"); + }, + + /** + * Create a record from the specified ID. + * + * If the ID is known, the record should be populated with metadata from + * the store. If the ID is not known, the record should be created with the + * delete field set to true. + * + * @param id + * string record ID + * @param collection + * Collection to add record to. This is typically passed into the + * constructor for the newly-created record. + * @return record type for this engine + */ + async createRecord(id, collection) { + throw new Error("override createRecord in a subclass"); + }, + + /** + * Change the ID of a record. + * + * @param oldID + * string old/current record ID + * @param newID + * string new record ID + */ + async changeItemID(oldID, newID) { + throw new Error("override changeItemID in a subclass"); + }, + + /** + * Obtain the set of all known record IDs. + * + * @return Object with ID strings as keys and values of true. The values + * are ignored. + */ + async getAllIDs() { + throw new Error("override getAllIDs in a subclass"); + }, + + /** + * Wipe all data in the store. + * + * This function is called during remote wipes or when replacing local data + * with remote data. + * + * This function should delete all local data that the store is managing. It + * can be thought of as clearing out all state and restoring the "new + * browser" state. + */ + async wipe() { + throw new Error("override wipe in a subclass"); + }, +}; + +export function EngineManager(service) { + this.service = service; + + this._engines = {}; + + this._altEngineInfo = {}; + + // This will be populated by Service on startup. + this._declined = new Set(); + this._log = Log.repository.getLogger("Sync.EngineManager"); + this._log.manageLevelFromPref("services.sync.log.logger.service.engines"); + // define the default level for all engine logs here (although each engine + // allows its level to be controlled via a specific, non-default pref) + Log.repository + .getLogger(`Sync.Engine`) + .manageLevelFromPref("services.sync.log.logger.engine"); +} + +EngineManager.prototype = { + get(name) { + // Return an array of engines if we have an array of names + if (Array.isArray(name)) { + let engines = []; + name.forEach(function (name) { + let engine = this.get(name); + if (engine) { + engines.push(engine); + } + }, this); + return engines; + } + + return this._engines[name]; // Silently returns undefined for unknown names. + }, + + getAll() { + let engines = []; + for (let [, engine] of Object.entries(this._engines)) { + engines.push(engine); + } + return engines; + }, + + /** + * If a user has changed a pref that controls which variant of a sync engine + * for a given collection we use, unregister the old engine and register the + * new one. + * + * This is called by EngineSynchronizer before every sync. + */ + async switchAlternatives() { + for (let [name, info] of Object.entries(this._altEngineInfo)) { + let prefValue = info.prefValue; + if (prefValue === info.lastValue) { + this._log.trace( + `No change for engine ${name} (${info.pref} is still ${prefValue})` + ); + continue; + } + // Unregister the old engine, register the new one. + this._log.info( + `Switching ${name} engine ("${info.pref}" went from ${info.lastValue} => ${prefValue})` + ); + try { + await this._removeAndFinalize(name); + } catch (e) { + this._log.warn(`Failed to remove previous ${name} engine...`, e); + } + let engineType = prefValue ? info.whenTrue : info.whenFalse; + try { + // If register throws, we'll try again next sync, but until then there + // won't be an engine registered for this collection. + await this.register(engineType); + info.lastValue = prefValue; + // Note: engineType.name is using Function.prototype.name. + this._log.info(`Switched the ${name} engine to use ${engineType.name}`); + } catch (e) { + this._log.warn( + `Switching the ${name} engine to use ${engineType.name} failed (couldn't register)`, + e + ); + } + } + }, + + async registerAlternatives(name, pref, whenTrue, whenFalse) { + let info = { name, pref, whenTrue, whenFalse }; + + XPCOMUtils.defineLazyPreferenceGetter(info, "prefValue", pref, false); + + let chosen = info.prefValue ? info.whenTrue : info.whenFalse; + info.lastValue = info.prefValue; + this._altEngineInfo[name] = info; + + await this.register(chosen); + }, + + /** + * N.B., does not pay attention to the declined list. + */ + getEnabled() { + return this.getAll() + .filter(engine => engine.enabled) + .sort((a, b) => a.syncPriority - b.syncPriority); + }, + + get enabledEngineNames() { + return this.getEnabled().map(e => e.name); + }, + + persistDeclined() { + Svc.Prefs.set("declinedEngines", [...this._declined].join(",")); + }, + + /** + * Returns an array. + */ + getDeclined() { + return [...this._declined]; + }, + + setDeclined(engines) { + this._declined = new Set(engines); + this.persistDeclined(); + }, + + isDeclined(engineName) { + return this._declined.has(engineName); + }, + + /** + * Accepts a Set or an array. + */ + decline(engines) { + for (let e of engines) { + this._declined.add(e); + } + this.persistDeclined(); + }, + + undecline(engines) { + for (let e of engines) { + this._declined.delete(e); + } + this.persistDeclined(); + }, + + /** + * Register an Engine to the service. Alternatively, give an array of engine + * objects to register. + * + * @param engineObject + * Engine object used to get an instance of the engine + * @return The engine object if anything failed + */ + async register(engineObject) { + if (Array.isArray(engineObject)) { + for (const e of engineObject) { + await this.register(e); + } + return; + } + + try { + let engine = new engineObject(this.service); + let name = engine.name; + if (name in this._engines) { + this._log.error("Engine '" + name + "' is already registered!"); + } else { + if (engine.initialize) { + await engine.initialize(); + } + this._engines[name] = engine; + } + } catch (ex) { + let name = engineObject || ""; + name = name.prototype || ""; + name = name.name || ""; + + this._log.error(`Could not initialize engine ${name}`, ex); + } + }, + + async unregister(val) { + let name = val; + if (val instanceof SyncEngine) { + name = val.name; + } + await this._removeAndFinalize(name); + delete this._altEngineInfo[name]; + }, + + // Common code for disabling an engine by name, that doesn't complain if the + // engine doesn't exist. Doesn't touch the engine's alternative info (if any + // exists). + async _removeAndFinalize(name) { + if (name in this._engines) { + let engine = this._engines[name]; + delete this._engines[name]; + await engine.finalize(); + } + }, + + async clear() { + for (let name in this._engines) { + let engine = this._engines[name]; + delete this._engines[name]; + await engine.finalize(); + } + this._altEngineInfo = {}; + }, +}; + +export function SyncEngine(name, service) { + if (!service) { + throw new Error("SyncEngine must be associated with a Service instance."); + } + + this.Name = name || "Unnamed"; + this.name = name.toLowerCase(); + this.service = service; + + this._notify = Utils.notify("weave:engine:"); + this._log = Log.repository.getLogger("Sync.Engine." + this.Name); + this._log.manageLevelFromPref(`services.sync.log.logger.engine.${this.name}`); + + this._modified = this.emptyChangeset(); + this._tracker; // initialize tracker to load previously changed IDs + this._log.debug("Engine constructed"); + + this._toFetchStorage = new JSONFile({ + path: Utils.jsonFilePath("toFetch", this.name), + dataPostProcessor: json => this._metadataPostProcessor(json), + beforeSave: () => this._beforeSaveMetadata(), + }); + + this._previousFailedStorage = new JSONFile({ + path: Utils.jsonFilePath("failed", this.name), + dataPostProcessor: json => this._metadataPostProcessor(json), + beforeSave: () => this._beforeSaveMetadata(), + }); + + XPCOMUtils.defineLazyPreferenceGetter( + this, + "_enabled", + `services.sync.engine.${this.prefName}`, + false + ); + XPCOMUtils.defineLazyPreferenceGetter( + this, + "_syncID", + `services.sync.${this.name}.syncID`, + "" + ); + XPCOMUtils.defineLazyPreferenceGetter( + this, + "_lastSync", + `services.sync.${this.name}.lastSync`, + "0", + null, + v => parseFloat(v) + ); + // Async initializations can be made in the initialize() method. + + this.asyncObserver = Async.asyncObserver(this, this._log); +} + +// Enumeration to define approaches to handling bad records. +// Attached to the constructor to allow use as a kind of static enumeration. +SyncEngine.kRecoveryStrategy = { + ignore: "ignore", + retry: "retry", + error: "error", +}; + +SyncEngine.prototype = { + _recordObj: CryptoWrapper, + // _storeObj, and _trackerObj should to be overridden in subclasses + _storeObj: Store, + _trackerObj: Tracker, + version: 1, + + // Local 'constant'. + // Signal to the engine that processing further records is pointless. + eEngineAbortApplyIncoming: "error.engine.abort.applyincoming", + + // Should we keep syncing if we find a record that cannot be uploaded (ever)? + // If this is false, we'll throw, otherwise, we'll ignore the record and + // continue. This currently can only happen due to the record being larger + // than the record upload limit. + allowSkippedRecord: true, + + // Which sortindex to use when retrieving records for this engine. + _defaultSort: undefined, + + _hasSyncedThisSession: false, + + _metadataPostProcessor(json) { + if (Array.isArray(json)) { + // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to + // an object, so we wrap the array for consistency. + json = { ids: json }; + } + if (!json.ids) { + json.ids = []; + } + // The set serializes the same way as an array, but offers more efficient + // methods of manipulation. + json.ids = new SerializableSet(json.ids); + return json; + }, + + async _beforeSaveMetadata() { + await ensureDirectory(this._toFetchStorage.path); + await ensureDirectory(this._previousFailedStorage.path); + }, + + // A relative priority to use when computing an order + // for engines to be synced. Higher-priority engines + // (lower numbers) are synced first. + // It is recommended that a unique value be used for each engine, + // in order to guarantee a stable sequence. + syncPriority: 0, + + // How many records to pull in a single sync. This is primarily to avoid very + // long first syncs against profiles with many history records. + downloadLimit: null, + + // How many records to pull at one time when specifying IDs. This is to avoid + // URI length limitations. + guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE, + + downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE, + + async initialize() { + await this._toFetchStorage.load(); + await this._previousFailedStorage.load(); + Svc.Prefs.observe(`engine.${this.prefName}`, this.asyncObserver); + this._log.debug("SyncEngine initialized", this.name); + }, + + get prefName() { + return this.name; + }, + + get enabled() { + return this._enabled; + }, + + set enabled(val) { + if (!!val != this._enabled) { + Svc.Prefs.set("engine." + this.prefName, !!val); + } + }, + + get score() { + return this._tracker.score; + }, + + get _store() { + let store = new this._storeObj(this.Name, this); + this.__defineGetter__("_store", () => store); + return store; + }, + + get _tracker() { + let tracker = new this._trackerObj(this.Name, this); + this.__defineGetter__("_tracker", () => tracker); + return tracker; + }, + + get storageURL() { + return this.service.storageURL; + }, + + get engineURL() { + return this.storageURL + this.name; + }, + + get cryptoKeysURL() { + return this.storageURL + "crypto/keys"; + }, + + get metaURL() { + return this.storageURL + "meta/global"; + }, + + startTracking() { + this._tracker.start(); + }, + + // Returns a promise + stopTracking() { + return this._tracker.stop(); + }, + + // Listens for engine enabled state changes, and updates the tracker's state. + // This is an async observer because the tracker waits on all its async + // observers to finish when it's stopped. + async observe(subject, topic, data) { + if ( + topic == "nsPref:changed" && + data == `services.sync.engine.${this.prefName}` + ) { + await this._tracker.onEngineEnabledChanged(this._enabled); + } + }, + + async sync() { + if (!this.enabled) { + return false; + } + + if (!this._sync) { + throw new Error("engine does not implement _sync method"); + } + + return this._notify("sync", this.name, this._sync)(); + }, + + // Override this method to return a new changeset type. + emptyChangeset() { + return new Changeset(); + }, + + /** + * Returns the local sync ID for this engine, or `""` if the engine hasn't + * synced for the first time. This is exposed for tests. + * + * @return the current sync ID. + */ + async getSyncID() { + return this._syncID; + }, + + /** + * Ensures that the local sync ID for the engine matches the sync ID for the + * collection on the server. A mismatch indicates that another client wiped + * the collection; we're syncing after a node reassignment, and another + * client synced before us; or the store was replaced since the last sync. + * In case of a mismatch, we need to reset all local Sync state and start + * over as a first sync. + * + * In most cases, this method should return the new sync ID as-is. However, an + * engine may ignore the given ID and assign a different one, if it determines + * that the sync ID on the server is out of date. The bookmarks engine uses + * this to wipe the server and other clients on the first sync after the user + * restores from a backup. + * + * @param newSyncID + * The new sync ID for the collection from `meta/global`. + * @return The assigned sync ID. If this doesn't match `newSyncID`, we'll + * replace the sync ID in `meta/global` with the assigned ID. + */ + async ensureCurrentSyncID(newSyncID) { + let existingSyncID = this._syncID; + if (existingSyncID == newSyncID) { + return existingSyncID; + } + this._log.debug("Engine syncIDs: " + [newSyncID, existingSyncID]); + Svc.Prefs.set(this.name + ".syncID", newSyncID); + Svc.Prefs.set(this.name + ".lastSync", "0"); + return newSyncID; + }, + + /** + * Resets the local sync ID for the engine, wipes the server, and resets all + * local Sync state to start over as a first sync. + * + * @return the new sync ID. + */ + async resetSyncID() { + let newSyncID = await this.resetLocalSyncID(); + await this.wipeServer(); + return newSyncID; + }, + + /** + * Resets the local sync ID for the engine, signaling that we're starting over + * as a first sync. + * + * @return the new sync ID. + */ + async resetLocalSyncID() { + return this.ensureCurrentSyncID(Utils.makeGUID()); + }, + + /** + * Allows overriding scheduler logic -- added to help reduce kinto server + * getting hammered because our scheduler never got tuned for it. + * + * Note: Overriding engines must take resyncs into account -- score will not + * be cleared. + */ + shouldSkipSync(syncReason) { + return false; + }, + + /* + * lastSync is a timestamp in server time. + */ + async getLastSync() { + return this._lastSync; + }, + async setLastSync(lastSync) { + // Store the value as a string to keep floating point precision + Svc.Prefs.set(this.name + ".lastSync", lastSync.toString()); + }, + async resetLastSync() { + this._log.debug("Resetting " + this.name + " last sync time"); + await this.setLastSync(0); + }, + + get hasSyncedThisSession() { + return this._hasSyncedThisSession; + }, + + set hasSyncedThisSession(hasSynced) { + this._hasSyncedThisSession = hasSynced; + }, + + get toFetch() { + this._toFetchStorage.ensureDataReady(); + return this._toFetchStorage.data.ids; + }, + + set toFetch(ids) { + if (ids.constructor.name != "SerializableSet") { + throw new Error( + "Bug: Attempted to set toFetch to something that isn't a SerializableSet" + ); + } + this._toFetchStorage.data = { ids }; + this._toFetchStorage.saveSoon(); + }, + + get previousFailed() { + this._previousFailedStorage.ensureDataReady(); + return this._previousFailedStorage.data.ids; + }, + + set previousFailed(ids) { + if (ids.constructor.name != "SerializableSet") { + throw new Error( + "Bug: Attempted to set previousFailed to something that isn't a SerializableSet" + ); + } + this._previousFailedStorage.data = { ids }; + this._previousFailedStorage.saveSoon(); + }, + + /* + * Returns a changeset for this sync. Engine implementations can override this + * method to bypass the tracker for certain or all changed items. + */ + async getChangedIDs() { + return this._tracker.getChangedIDs(); + }, + + // Create a new record using the store and add in metadata. + async _createRecord(id) { + let record = await this._store.createRecord(id, this.name); + record.id = id; + record.collection = this.name; + return record; + }, + + // Creates a tombstone Sync record with additional metadata. + _createTombstone(id) { + let tombstone = new this._recordObj(this.name, id); + tombstone.id = id; + tombstone.collection = this.name; + tombstone.deleted = true; + return tombstone; + }, + + // Any setup that needs to happen at the beginning of each sync. + async _syncStartup() { + // Determine if we need to wipe on outdated versions + let metaGlobal = await this.service.recordManager.get(this.metaURL); + let engines = metaGlobal.payload.engines || {}; + let engineData = engines[this.name] || {}; + + // Assume missing versions are 0 and wipe the server + if ((engineData.version || 0) < this.version) { + this._log.debug("Old engine data: " + [engineData.version, this.version]); + + // Clear the server and reupload everything on bad version or missing + // meta. Note that we don't regenerate per-collection keys here. + let newSyncID = await this.resetSyncID(); + + // Set the newer version and newly generated syncID + engineData.version = this.version; + engineData.syncID = newSyncID; + + // Put the new data back into meta/global and mark for upload + engines[this.name] = engineData; + metaGlobal.payload.engines = engines; + metaGlobal.changed = true; + } else if (engineData.version > this.version) { + // Don't sync this engine if the server has newer data + + let error = new Error("New data: " + [engineData.version, this.version]); + error.failureCode = VERSION_OUT_OF_DATE; + throw error; + } else { + // Changes to syncID mean we'll need to upload everything + let assignedSyncID = await this.ensureCurrentSyncID(engineData.syncID); + if (assignedSyncID != engineData.syncID) { + engineData.syncID = assignedSyncID; + metaGlobal.changed = true; + } + } + + // Save objects that need to be uploaded in this._modified. As we + // successfully upload objects we remove them from this._modified. If an + // error occurs or any objects fail to upload, they will remain in + // this._modified. At the end of a sync, or after an error, we add all + // objects remaining in this._modified to the tracker. + let initialChanges = await this.pullChanges(); + this._modified.replace(initialChanges); + // Clear the tracker now. If the sync fails we'll add the ones we failed + // to upload back. + this._tracker.clearChangedIDs(); + this._tracker.resetScore(); + + // Keep track of what to delete at the end of sync + this._delete = {}; + }, + + async pullChanges() { + let lastSync = await this.getLastSync(); + if (lastSync) { + return this.pullNewChanges(); + } + this._log.debug("First sync, uploading all items"); + return this.pullAllChanges(); + }, + + /** + * A tiny abstraction to make it easier to test incoming record + * application. + */ + itemSource() { + return new Collection(this.engineURL, this._recordObj, this.service); + }, + + /** + * Download and apply remote records changed since the last sync. This + * happens in three stages. + * + * In the first stage, we fetch full records for all changed items, newest + * first, up to the download limit. The limit lets us make progress for large + * collections, where the sync is likely to be interrupted before we + * can fetch everything. + * + * In the second stage, we fetch the IDs of any remaining records changed + * since the last sync, add them to our backlog, and fast-forward our last + * sync time. + * + * In the third stage, we fetch and apply records for all backlogged IDs, + * as well as any records that failed to apply during the last sync. We + * request records for the IDs in chunks, to avoid exceeding URL length + * limits, then remove successfully applied records from the backlog, and + * record IDs of any records that failed to apply to retry on the next sync. + */ + async _processIncoming() { + this._log.trace("Downloading & applying server changes"); + + let newitems = this.itemSource(); + let lastSync = await this.getLastSync(); + + newitems.newer = lastSync; + newitems.full = true; + + let downloadLimit = Infinity; + if (this.downloadLimit) { + // Fetch new records up to the download limit. Currently, only the history + // engine sets a limit, since the history collection has the highest volume + // of changed records between syncs. The other engines fetch all records + // changed since the last sync. + if (this._defaultSort) { + // A download limit with a sort order doesn't make sense: we won't know + // which records to backfill. + throw new Error("Can't specify download limit with default sort order"); + } + newitems.sort = "newest"; + downloadLimit = newitems.limit = this.downloadLimit; + } else if (this._defaultSort) { + // The bookmarks engine fetches records by sort index; other engines leave + // the order unspecified. We can remove `_defaultSort` entirely after bug + // 1305563: the sort index won't matter because we'll buffer all bookmarks + // before applying. + newitems.sort = this._defaultSort; + } + + // applied => number of items that should be applied. + // failed => number of items that failed in this sync. + // newFailed => number of items that failed for the first time in this sync. + // reconciled => number of items that were reconciled. + // failedReasons => {name, count} of reasons a record failed + let countTelemetry = new SyncedRecordsTelemetry(); + let count = countTelemetry.incomingCounts; + let recordsToApply = []; + let failedInCurrentSync = new SerializableSet(); + + let oldestModified = this.lastModified; + let downloadedIDs = new Set(); + + // Stage 1: Fetch new records from the server, up to the download limit. + if (this.lastModified == null || this.lastModified > lastSync) { + let { response, records } = await newitems.getBatched( + this.downloadBatchSize + ); + if (!response.success) { + response.failureCode = ENGINE_DOWNLOAD_FAIL; + throw response; + } + + await Async.yieldingForEach(records, async record => { + downloadedIDs.add(record.id); + + if (record.modified < oldestModified) { + oldestModified = record.modified; + } + + let { shouldApply, error } = await this._maybeReconcile(record); + if (error) { + failedInCurrentSync.add(record.id); + count.failed++; + countTelemetry.addIncomingFailedReason(error.message); + return; + } + if (!shouldApply) { + count.reconciled++; + return; + } + recordsToApply.push(record); + }); + + let failedToApply = await this._applyRecords( + recordsToApply, + countTelemetry + ); + Utils.setAddAll(failedInCurrentSync, failedToApply); + + // `applied` is a bit of a misnomer: it counts records that *should* be + // applied, so it also includes records that we tried to apply and failed. + // `recordsToApply.length - failedToApply.length` is the number of records + // that we *successfully* applied. + count.failed += failedToApply.length; + count.applied += recordsToApply.length; + } + + // Stage 2: If we reached our download limit, we might still have records + // on the server that changed since the last sync. Fetch the IDs for the + // remaining records, and add them to the backlog. Note that this stage + // only runs for engines that set a download limit. + if (downloadedIDs.size == downloadLimit) { + let guidColl = this.itemSource(); + + guidColl.newer = lastSync; + guidColl.older = oldestModified; + guidColl.sort = "oldest"; + + let guids = await guidColl.get(); + if (!guids.success) { + throw guids; + } + + // Filtering out already downloaded IDs here isn't necessary. We only do + // that in case the Sync server doesn't support `older` (bug 1316110). + let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id)); + if (remainingIDs.length) { + this.toFetch = Utils.setAddAll(this.toFetch, remainingIDs); + } + } + + // Fast-foward the lastSync timestamp since we have backlogged the + // remaining items. + if (lastSync < this.lastModified) { + lastSync = this.lastModified; + await this.setLastSync(lastSync); + } + + // Stage 3: Backfill records from the backlog, and those that failed to + // decrypt or apply during the last sync. We only backfill up to the + // download limit, to prevent a large backlog for one engine from blocking + // the others. We'll keep processing the backlog on subsequent engine syncs. + let failedInPreviousSync = this.previousFailed; + let idsToBackfill = Array.from( + Utils.setAddAll( + Utils.subsetOfSize(this.toFetch, downloadLimit), + failedInPreviousSync + ) + ); + + // Note that we intentionally overwrite the previously failed list here. + // Records that fail to decrypt or apply in two consecutive syncs are likely + // corrupt; we remove them from the list because retrying and failing on + // every subsequent sync just adds noise. + this.previousFailed = failedInCurrentSync; + + let backfilledItems = this.itemSource(); + + backfilledItems.sort = "newest"; + backfilledItems.full = true; + + // `getBatched` includes the list of IDs as a query parameter, so we need to fetch + // records in chunks to avoid exceeding URI length limits. + if (this.guidFetchBatchSize) { + for (let ids of lazy.PlacesUtils.chunkArray( + idsToBackfill, + this.guidFetchBatchSize + )) { + backfilledItems.ids = ids; + + let { response, records } = await backfilledItems.getBatched( + this.downloadBatchSize + ); + if (!response.success) { + response.failureCode = ENGINE_DOWNLOAD_FAIL; + throw response; + } + + let backfilledRecordsToApply = []; + let failedInBackfill = []; + + await Async.yieldingForEach(records, async record => { + let { shouldApply, error } = await this._maybeReconcile(record); + if (error) { + failedInBackfill.push(record.id); + count.failed++; + countTelemetry.addIncomingFailedReason(error.message); + return; + } + if (!shouldApply) { + count.reconciled++; + return; + } + backfilledRecordsToApply.push(record); + }); + + let failedToApply = await this._applyRecords( + backfilledRecordsToApply, + countTelemetry + ); + failedInBackfill.push(...failedToApply); + + count.failed += failedToApply.length; + count.applied += backfilledRecordsToApply.length; + + this.toFetch = Utils.setDeleteAll(this.toFetch, ids); + this.previousFailed = Utils.setAddAll( + this.previousFailed, + failedInBackfill + ); + + if (lastSync < this.lastModified) { + lastSync = this.lastModified; + await this.setLastSync(lastSync); + } + } + } + + count.newFailed = 0; + for (let item of this.previousFailed) { + // Anything that failed in the current sync that also failed in + // the previous sync means there is likely something wrong with + // the record, we remove it from trying again to prevent + // infinitely syncing corrupted records + if (failedInPreviousSync.has(item)) { + this.previousFailed.delete(item); + } else { + // otherwise it's a new failed and we count it as so + ++count.newFailed; + } + } + + count.succeeded = Math.max(0, count.applied - count.failed); + this._log.info( + [ + "Records:", + count.applied, + "applied,", + count.succeeded, + "successfully,", + count.failed, + "failed to apply,", + count.newFailed, + "newly failed to apply,", + count.reconciled, + "reconciled.", + ].join(" ") + ); + Observers.notify("weave:engine:sync:applied", count, this.name); + }, + + async _maybeReconcile(item) { + let key = this.service.collectionKeys.keyForCollection(this.name); + + // Grab a later last modified if possible + if (this.lastModified == null || item.modified > this.lastModified) { + this.lastModified = item.modified; + } + + try { + try { + await item.decrypt(key); + } catch (ex) { + if (!Utils.isHMACMismatch(ex)) { + throw ex; + } + let strategy = await this.handleHMACMismatch(item, true); + if (strategy == SyncEngine.kRecoveryStrategy.retry) { + // You only get one retry. + try { + // Try decrypting again, typically because we've got new keys. + this._log.info("Trying decrypt again..."); + key = this.service.collectionKeys.keyForCollection(this.name); + await item.decrypt(key); + strategy = null; + } catch (ex) { + if (!Utils.isHMACMismatch(ex)) { + throw ex; + } + strategy = await this.handleHMACMismatch(item, false); + } + } + + switch (strategy) { + case null: + // Retry succeeded! No further handling. + break; + case SyncEngine.kRecoveryStrategy.retry: + this._log.debug("Ignoring second retry suggestion."); + // Fall through to error case. + case SyncEngine.kRecoveryStrategy.error: + this._log.warn("Error decrypting record", ex); + return { shouldApply: false, error: ex }; + case SyncEngine.kRecoveryStrategy.ignore: + this._log.debug( + "Ignoring record " + item.id + " with bad HMAC: already handled." + ); + return { shouldApply: false, error: null }; + } + } + } catch (ex) { + if (Async.isShutdownException(ex)) { + throw ex; + } + this._log.warn("Error decrypting record", ex); + return { shouldApply: false, error: ex }; + } + + if (this._shouldDeleteRemotely(item)) { + this._log.trace("Deleting item from server without applying", item); + await this._deleteId(item.id); + return { shouldApply: false, error: null }; + } + + let shouldApply; + try { + shouldApply = await this._reconcile(item); + } catch (ex) { + if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) { + this._log.warn("Reconciliation failed: aborting incoming processing."); + throw ex.cause; + } else if (!Async.isShutdownException(ex)) { + this._log.warn("Failed to reconcile incoming record " + item.id, ex); + return { shouldApply: false, error: ex }; + } else { + throw ex; + } + } + + if (!shouldApply) { + this._log.trace("Skipping reconciled incoming item " + item.id); + } + + return { shouldApply, error: null }; + }, + + async _applyRecords(records, countTelemetry) { + this._tracker.ignoreAll = true; + try { + let failedIDs = await this._store.applyIncomingBatch( + records, + countTelemetry + ); + return failedIDs; + } catch (ex) { + // Catch any error that escapes from applyIncomingBatch. At present + // those will all be abort events. + this._log.warn("Got exception, aborting processIncoming", ex); + throw ex; + } finally { + this._tracker.ignoreAll = false; + } + }, + + // Indicates whether an incoming item should be deleted from the server at + // the end of the sync. Engines can override this method to clean up records + // that shouldn't be on the server. + _shouldDeleteRemotely(remoteItem) { + return false; + }, + + /** + * Find a GUID of an item that is a duplicate of the incoming item but happens + * to have a different GUID + * + * @return GUID of the similar item; falsy otherwise + */ + async _findDupe(item) { + // By default, assume there's no dupe items for the engine + }, + + /** + * Called before a remote record is discarded due to failed reconciliation. + * Used by bookmark sync to merge folder child orders. + */ + beforeRecordDiscard(localRecord, remoteRecord, remoteIsNewer) {}, + + // Called when the server has a record marked as deleted, but locally we've + // changed it more recently than the deletion. If we return false, the + // record will be deleted locally. If we return true, we'll reupload the + // record to the server -- any extra work that's needed as part of this + // process should be done at this point (such as mark the record's parent + // for reuploading in the case of bookmarks). + async _shouldReviveRemotelyDeletedRecord(remoteItem) { + return true; + }, + + async _deleteId(id) { + await this._tracker.removeChangedID(id); + this._noteDeletedId(id); + }, + + // Marks an ID for deletion at the end of the sync. + _noteDeletedId(id) { + if (this._delete.ids == null) { + this._delete.ids = [id]; + } else { + this._delete.ids.push(id); + } + }, + + async _switchItemToDupe(localDupeGUID, incomingItem) { + // The local, duplicate ID is always deleted on the server. + await this._deleteId(localDupeGUID); + + // We unconditionally change the item's ID in case the engine knows of + // an item but doesn't expose it through itemExists. If the API + // contract were stronger, this could be changed. + this._log.debug( + "Switching local ID to incoming: " + + localDupeGUID + + " -> " + + incomingItem.id + ); + return this._store.changeItemID(localDupeGUID, incomingItem.id); + }, + + /** + * Reconcile incoming record with local state. + * + * This function essentially determines whether to apply an incoming record. + * + * @param item + * Record from server to be tested for application. + * @return boolean + * Truthy if incoming record should be applied. False if not. + */ + async _reconcile(item) { + if (this._log.level <= Log.Level.Trace) { + this._log.trace("Incoming: " + item); + } + + // We start reconciling by collecting a bunch of state. We do this here + // because some state may change during the course of this function and we + // need to operate on the original values. + let existsLocally = await this._store.itemExists(item.id); + let locallyModified = this._modified.has(item.id); + + // TODO Handle clock drift better. Tracked in bug 721181. + let remoteAge = Resource.serverTime - item.modified; + let localAge = locallyModified + ? Date.now() / 1000 - this._modified.getModifiedTimestamp(item.id) + : null; + let remoteIsNewer = remoteAge < localAge; + + this._log.trace( + "Reconciling " + + item.id + + ". exists=" + + existsLocally + + "; modified=" + + locallyModified + + "; local age=" + + localAge + + "; incoming age=" + + remoteAge + ); + + // We handle deletions first so subsequent logic doesn't have to check + // deleted flags. + if (item.deleted) { + // If the item doesn't exist locally, there is nothing for us to do. We + // can't check for duplicates because the incoming record has no data + // which can be used for duplicate detection. + if (!existsLocally) { + this._log.trace( + "Ignoring incoming item because it was deleted and " + + "the item does not exist locally." + ); + return false; + } + + // We decide whether to process the deletion by comparing the record + // ages. If the item is not modified locally, the remote side wins and + // the deletion is processed. If it is modified locally, we take the + // newer record. + if (!locallyModified) { + this._log.trace( + "Applying incoming delete because the local item " + + "exists and isn't modified." + ); + return true; + } + this._log.trace("Incoming record is deleted but we had local changes."); + + if (remoteIsNewer) { + this._log.trace("Remote record is newer -- deleting local record."); + return true; + } + // If the local record is newer, we defer to individual engines for + // how to handle this. By default, we revive the record. + let willRevive = await this._shouldReviveRemotelyDeletedRecord(item); + this._log.trace("Local record is newer -- reviving? " + willRevive); + + return !willRevive; + } + + // At this point the incoming record is not for a deletion and must have + // data. If the incoming record does not exist locally, we check for a local + // duplicate existing under a different ID. The default implementation of + // _findDupe() is empty, so engines have to opt in to this functionality. + // + // If we find a duplicate, we change the local ID to the incoming ID and we + // refresh the metadata collected above. See bug 710448 for the history + // of this logic. + if (!existsLocally) { + let localDupeGUID = await this._findDupe(item); + if (localDupeGUID) { + this._log.trace( + "Local item " + + localDupeGUID + + " is a duplicate for " + + "incoming item " + + item.id + ); + + // The current API contract does not mandate that the ID returned by + // _findDupe() actually exists. Therefore, we have to perform this + // check. + existsLocally = await this._store.itemExists(localDupeGUID); + + // If the local item was modified, we carry its metadata forward so + // appropriate reconciling can be performed. + if (this._modified.has(localDupeGUID)) { + locallyModified = true; + localAge = + this._tracker._now() - + this._modified.getModifiedTimestamp(localDupeGUID); + remoteIsNewer = remoteAge < localAge; + + this._modified.changeID(localDupeGUID, item.id); + } else { + locallyModified = false; + localAge = null; + } + + // Tell the engine to do whatever it needs to switch the items. + await this._switchItemToDupe(localDupeGUID, item); + + this._log.debug( + "Local item after duplication: age=" + + localAge + + "; modified=" + + locallyModified + + "; exists=" + + existsLocally + ); + } else { + this._log.trace("No duplicate found for incoming item: " + item.id); + } + } + + // At this point we've performed duplicate detection. But, nothing here + // should depend on duplicate detection as the above should have updated + // state seamlessly. + + if (!existsLocally) { + // If the item doesn't exist locally and we have no local modifications + // to the item (implying that it was not deleted), always apply the remote + // item. + if (!locallyModified) { + this._log.trace( + "Applying incoming because local item does not exist " + + "and was not deleted." + ); + return true; + } + + // If the item was modified locally but isn't present, it must have + // been deleted. If the incoming record is younger, we restore from + // that record. + if (remoteIsNewer) { + this._log.trace( + "Applying incoming because local item was deleted " + + "before the incoming item was changed." + ); + this._modified.delete(item.id); + return true; + } + + this._log.trace( + "Ignoring incoming item because the local item's " + + "deletion is newer." + ); + return false; + } + + // If the remote and local records are the same, there is nothing to be + // done, so we don't do anything. In the ideal world, this logic wouldn't + // be here and the engine would take a record and apply it. The reason we + // want to defer this logic is because it would avoid a redundant and + // possibly expensive dip into the storage layer to query item state. + // This should get addressed in the async rewrite, so we ignore it for now. + let localRecord = await this._createRecord(item.id); + let recordsEqual = Utils.deepEquals(item.cleartext, localRecord.cleartext); + + // If the records are the same, we don't need to do anything. This does + // potentially throw away a local modification time. But, if the records + // are the same, does it matter? + if (recordsEqual) { + this._log.trace( + "Ignoring incoming item because the local item is identical." + ); + + this._modified.delete(item.id); + return false; + } + + // At this point the records are different. + + // If we have no local modifications, always take the server record. + if (!locallyModified) { + this._log.trace("Applying incoming record because no local conflicts."); + return true; + } + + // At this point, records are different and the local record is modified. + // We resolve conflicts by record age, where the newest one wins. This does + // result in data loss and should be handled by giving the engine an + // opportunity to merge the records. Bug 720592 tracks this feature. + this._log.warn( + "DATA LOSS: Both local and remote changes to record: " + item.id + ); + if (!remoteIsNewer) { + this.beforeRecordDiscard(localRecord, item, remoteIsNewer); + } + return remoteIsNewer; + }, + + // Upload outgoing records. + async _uploadOutgoing() { + this._log.trace("Uploading local changes to server."); + + // collection we'll upload + let up = new Collection(this.engineURL, null, this.service); + let modifiedIDs = new Set(this._modified.ids()); + let countTelemetry = new SyncedRecordsTelemetry(); + let counts = countTelemetry.outgoingCounts; + this._log.info(`Uploading ${modifiedIDs.size} outgoing records`); + if (modifiedIDs.size) { + counts.sent = modifiedIDs.size; + + let failed = []; + let successful = []; + let lastSync = await this.getLastSync(); + let handleResponse = async (postQueue, resp, batchOngoing) => { + // Note: We don't want to update this.lastSync, or this._modified until + // the batch is complete, however we want to remember success/failure + // indicators for when that happens. + if (!resp.success) { + this._log.debug(`Uploading records failed: ${resp.status}`); + resp.failureCode = + resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL; + throw resp; + } + + // Update server timestamp from the upload. + failed = failed.concat(Object.keys(resp.obj.failed)); + successful = successful.concat(resp.obj.success); + + if (batchOngoing) { + // Nothing to do yet + return; + } + + if (failed.length && this._log.level <= Log.Level.Debug) { + this._log.debug( + "Records that will be uploaded again because " + + "the server couldn't store them: " + + failed.join(", ") + ); + } + + counts.failed += failed.length; + Object.values(failed).forEach(message => { + countTelemetry.addOutgoingFailedReason(message); + }); + + for (let id of successful) { + this._modified.delete(id); + } + + await this._onRecordsWritten( + successful, + failed, + postQueue.lastModified + ); + + // Advance lastSync since we've finished the batch. + if (postQueue.lastModified > lastSync) { + lastSync = postQueue.lastModified; + await this.setLastSync(lastSync); + } + + // clear for next batch + failed.length = 0; + successful.length = 0; + }; + + let postQueue = up.newPostQueue(this._log, lastSync, handleResponse); + + for (let id of modifiedIDs) { + let out; + let ok = false; + try { + out = await this._createRecord(id); + if (this._log.level <= Log.Level.Trace) { + this._log.trace("Outgoing: " + out); + } + await out.encrypt( + this.service.collectionKeys.keyForCollection(this.name) + ); + ok = true; + } catch (ex) { + this._log.warn("Error creating record", ex); + ++counts.failed; + countTelemetry.addOutgoingFailedReason(ex.message); + if (Async.isShutdownException(ex) || !this.allowSkippedRecord) { + if (!this.allowSkippedRecord) { + // Don't bother for shutdown errors + Observers.notify("weave:engine:sync:uploaded", counts, this.name); + } + throw ex; + } + } + if (ok) { + let { enqueued, error } = await postQueue.enqueue(out); + if (!enqueued) { + ++counts.failed; + countTelemetry.addOutgoingFailedReason(error.message); + if (!this.allowSkippedRecord) { + Observers.notify("weave:engine:sync:uploaded", counts, this.name); + this._log.warn( + `Failed to enqueue record "${id}" (aborting)`, + error + ); + throw error; + } + this._modified.delete(id); + this._log.warn( + `Failed to enqueue record "${id}" (skipping)`, + error + ); + } + } + await Async.promiseYield(); + } + await postQueue.flush(true); + } + + if (counts.sent || counts.failed) { + Observers.notify("weave:engine:sync:uploaded", counts, this.name); + } + }, + + async _onRecordsWritten(succeeded, failed, serverModifiedTime) { + // Implement this method to take specific actions against successfully + // uploaded records and failed records. + }, + + // Any cleanup necessary. + // Save the current snapshot so as to calculate changes at next sync + async _syncFinish() { + this._log.trace("Finishing up sync"); + + let doDelete = async (key, val) => { + let coll = new Collection(this.engineURL, this._recordObj, this.service); + coll[key] = val; + await coll.delete(); + }; + + for (let [key, val] of Object.entries(this._delete)) { + // Remove the key for future uses + delete this._delete[key]; + + this._log.trace("doing post-sync deletions", { key, val }); + // Send a simple delete for the property + if (key != "ids" || val.length <= 100) { + await doDelete(key, val); + } else { + // For many ids, split into chunks of at most 100 + while (val.length) { + await doDelete(key, val.slice(0, 100)); + val = val.slice(100); + } + } + } + this.hasSyncedThisSession = true; + await this._tracker.asyncObserver.promiseObserversComplete(); + }, + + async _syncCleanup() { + try { + // Mark failed WBOs as changed again so they are reuploaded next time. + await this.trackRemainingChanges(); + } finally { + this._modified.clear(); + } + }, + + async _sync() { + try { + Async.checkAppReady(); + await this._syncStartup(); + Async.checkAppReady(); + Observers.notify("weave:engine:sync:status", "process-incoming"); + await this._processIncoming(); + Async.checkAppReady(); + Observers.notify("weave:engine:sync:status", "upload-outgoing"); + try { + await this._uploadOutgoing(); + Async.checkAppReady(); + await this._syncFinish(); + } catch (ex) { + if (!ex.status || ex.status != 412) { + throw ex; + } + // a 412 posting just means another client raced - but we don't want + // to treat that as a sync error - the next sync is almost certain + // to work. + this._log.warn("412 error during sync - will retry."); + } + } finally { + await this._syncCleanup(); + } + }, + + async canDecrypt() { + // Report failure even if there's nothing to decrypt + let canDecrypt = false; + + // Fetch the most recently uploaded record and try to decrypt it + let test = new Collection(this.engineURL, this._recordObj, this.service); + test.limit = 1; + test.sort = "newest"; + test.full = true; + + let key = this.service.collectionKeys.keyForCollection(this.name); + + // Any failure fetching/decrypting will just result in false + try { + this._log.trace("Trying to decrypt a record from the server.."); + let json = (await test.get()).obj[0]; + let record = new this._recordObj(); + record.deserialize(json); + await record.decrypt(key); + canDecrypt = true; + } catch (ex) { + if (Async.isShutdownException(ex)) { + throw ex; + } + this._log.debug("Failed test decrypt", ex); + } + + return canDecrypt; + }, + + /** + * Deletes the collection for this engine on the server, and removes all local + * Sync metadata for this engine. This does *not* remove any existing data on + * other clients. This is called when we reset the sync ID. + */ + async wipeServer() { + await this._deleteServerCollection(); + await this._resetClient(); + }, + + /** + * Deletes the collection for this engine on the server, without removing + * any local Sync metadata or user data. Deleting the collection will not + * remove any user data on other clients, but will force other clients to + * start over as a first sync. + */ + async _deleteServerCollection() { + let response = await this.service.resource(this.engineURL).delete(); + if (response.status != 200 && response.status != 404) { + throw response; + } + }, + + async removeClientData() { + // Implement this method in engines that store client specific data + // on the server. + }, + + /* + * Decide on (and partially effect) an error-handling strategy. + * + * Asks the Service to respond to an HMAC error, which might result in keys + * being downloaded. That call returns true if an action which might allow a + * retry to occur. + * + * If `mayRetry` is truthy, and the Service suggests a retry, + * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns + * kRecoveryStrategy.error. + * + * Subclasses of SyncEngine can override this method to allow for different + * behavior -- e.g., to delete and ignore erroneous entries. + * + * All return values will be part of the kRecoveryStrategy enumeration. + */ + async handleHMACMismatch(item, mayRetry) { + // By default we either try again, or bail out noisily. + return (await this.service.handleHMACEvent()) && mayRetry + ? SyncEngine.kRecoveryStrategy.retry + : SyncEngine.kRecoveryStrategy.error; + }, + + /** + * Returns a changeset containing all items in the store. The default + * implementation returns a changeset with timestamps from long ago, to + * ensure we always use the remote version if one exists. + * + * This function is only called for the first sync. Subsequent syncs call + * `pullNewChanges`. + * + * @return A `Changeset` object. + */ + async pullAllChanges() { + let changes = {}; + let ids = await this._store.getAllIDs(); + for (let id in ids) { + changes[id] = 0; + } + return changes; + }, + + /* + * Returns a changeset containing entries for all currently tracked items. + * The default implementation returns a changeset with timestamps indicating + * when the item was added to the tracker. + * + * @return A `Changeset` object. + */ + async pullNewChanges() { + await this._tracker.asyncObserver.promiseObserversComplete(); + return this.getChangedIDs(); + }, + + /** + * Adds all remaining changeset entries back to the tracker, typically for + * items that failed to upload. This method is called at the end of each sync. + * + */ + async trackRemainingChanges() { + for (let [id, change] of this._modified.entries()) { + await this._tracker.addChangedID(id, change); + } + }, + + /** + * Removes all local Sync metadata for this engine, but keeps all existing + * local user data. + */ + async resetClient() { + return this._notify("reset-client", this.name, this._resetClient)(); + }, + + async _resetClient() { + await this.resetLastSync(); + this.hasSyncedThisSession = false; + this.previousFailed = new SerializableSet(); + this.toFetch = new SerializableSet(); + }, + + /** + * Removes all local Sync metadata and user data for this engine. + */ + async wipeClient() { + return this._notify("wipe-client", this.name, this._wipeClient)(); + }, + + async _wipeClient() { + await this.resetClient(); + this._log.debug("Deleting all local data"); + this._tracker.ignoreAll = true; + await this._store.wipe(); + this._tracker.ignoreAll = false; + this._tracker.clearChangedIDs(); + }, + + /** + * If one exists, initialize and return a validator for this engine (which + * must have a `validate(engine)` method that returns a promise to an object + * with a getSummary method). Otherwise return null. + */ + getValidator() { + return null; + }, + + async finalize() { + Svc.Prefs.ignore(`engine.${this.prefName}`, this.asyncObserver); + await this.asyncObserver.promiseObserversComplete(); + await this._tracker.finalize(); + await this._toFetchStorage.finalize(); + await this._previousFailedStorage.finalize(); + }, + + // Returns a new watchdog. Exposed for tests. + _newWatchdog() { + return Async.watchdog(); + }, +}; + +/** + * A changeset is created for each sync in `Engine::get{Changed, All}IDs`, + * and stores opaque change data for tracked IDs. The default implementation + * only records timestamps, though engines can extend this to store additional + * data for each entry. + */ +export class Changeset { + // Creates an empty changeset. + constructor() { + this.changes = {}; + } + + // Returns the last modified time, in seconds, for an entry in the changeset. + // `id` is guaranteed to be in the set. + getModifiedTimestamp(id) { + return this.changes[id]; + } + + // Adds a change for a tracked ID to the changeset. + set(id, change) { + this.changes[id] = change; + } + + // Adds multiple entries to the changeset, preserving existing entries. + insert(changes) { + Object.assign(this.changes, changes); + } + + // Overwrites the existing set of tracked changes with new entries. + replace(changes) { + this.changes = changes; + } + + // Indicates whether an entry is in the changeset. + has(id) { + return id in this.changes; + } + + // Deletes an entry from the changeset. Used to clean up entries for + // reconciled and successfully uploaded records. + delete(id) { + delete this.changes[id]; + } + + // Changes the ID of an entry in the changeset. Used when reconciling + // duplicates that have local changes. + changeID(oldID, newID) { + this.changes[newID] = this.changes[oldID]; + delete this.changes[oldID]; + } + + // Returns an array of all tracked IDs in this changeset. + ids() { + return Object.keys(this.changes); + } + + // Returns an array of `[id, change]` tuples. Used to repopulate the tracker + // with entries for failed uploads at the end of a sync. + entries() { + return Object.entries(this.changes); + } + + // Returns the number of entries in this changeset. + count() { + return this.ids().length; + } + + // Clears the changeset. + clear() { + this.changes = {}; + } +} diff --git a/services/sync/modules/engines/addons.sys.mjs b/services/sync/modules/engines/addons.sys.mjs new file mode 100644 index 0000000000..d1b766a957 --- /dev/null +++ b/services/sync/modules/engines/addons.sys.mjs @@ -0,0 +1,820 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/* + * This file defines the add-on sync functionality. + * + * There are currently a number of known limitations: + * - We only sync XPI extensions and themes available from addons.mozilla.org. + * We hope to expand support for other add-ons eventually. + * - We only attempt syncing of add-ons between applications of the same type. + * This means add-ons will not synchronize between Firefox desktop and + * Firefox mobile, for example. This is because of significant add-on + * incompatibility between application types. + * + * Add-on records exist for each known {add-on, app-id} pair in the Sync client + * set. Each record has a randomly chosen GUID. The records then contain + * basic metadata about the add-on. + * + * We currently synchronize: + * + * - Installations + * - Uninstallations + * - User enabling and disabling + * + * Synchronization is influenced by the following preferences: + * + * - services.sync.addons.ignoreUserEnabledChanges + * - services.sync.addons.trustedSourceHostnames + * + * and also influenced by whether addons have repository caching enabled and + * whether they allow installation of addons from insecure options (both of + * which are themselves influenced by the "extensions." pref branch) + * + * See the documentation in all.js for the behavior of these prefs. + */ + +import { Preferences } from "resource://gre/modules/Preferences.sys.mjs"; + +import { AddonUtils } from "resource://services-sync/addonutils.sys.mjs"; +import { AddonsReconciler } from "resource://services-sync/addonsreconciler.sys.mjs"; +import { + Store, + SyncEngine, + LegacyTracker, +} from "resource://services-sync/engines.sys.mjs"; +import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; + +import { SCORE_INCREMENT_XLARGE } from "resource://services-sync/constants.sys.mjs"; +import { CollectionValidator } from "resource://services-sync/collection_validator.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + AddonManager: "resource://gre/modules/AddonManager.sys.mjs", + AddonRepository: "resource://gre/modules/addons/AddonRepository.sys.mjs", +}); + +// 7 days in milliseconds. +const PRUNE_ADDON_CHANGES_THRESHOLD = 60 * 60 * 24 * 7 * 1000; + +/** + * AddonRecord represents the state of an add-on in an application. + * + * Each add-on has its own record for each application ID it is installed + * on. + * + * The ID of add-on records is a randomly-generated GUID. It is random instead + * of deterministic so the URIs of the records cannot be guessed and so + * compromised server credentials won't result in disclosure of the specific + * add-ons present in a Sync account. + * + * The record contains the following fields: + * + * addonID + * ID of the add-on. This correlates to the "id" property on an Addon type. + * + * applicationID + * The application ID this record is associated with. + * + * enabled + * Boolean stating whether add-on is enabled or disabled by the user. + * + * source + * String indicating where an add-on is from. Currently, we only support + * the value "amo" which indicates that the add-on came from the official + * add-ons repository, addons.mozilla.org. In the future, we may support + * installing add-ons from other sources. This provides a future-compatible + * mechanism for clients to only apply records they know how to handle. + */ +function AddonRecord(collection, id) { + CryptoWrapper.call(this, collection, id); +} +AddonRecord.prototype = { + _logName: "Record.Addon", +}; +Object.setPrototypeOf(AddonRecord.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(AddonRecord, "cleartext", [ + "addonID", + "applicationID", + "enabled", + "source", +]); + +/** + * The AddonsEngine handles synchronization of add-ons between clients. + * + * The engine maintains an instance of an AddonsReconciler, which is the entity + * maintaining state for add-ons. It provides the history and tracking APIs + * that AddonManager doesn't. + * + * The engine instance overrides a handful of functions on the base class. The + * rationale for each is documented by that function. + */ +export function AddonsEngine(service) { + SyncEngine.call(this, "Addons", service); + + this._reconciler = new AddonsReconciler(this._tracker.asyncObserver); +} + +AddonsEngine.prototype = { + _storeObj: AddonsStore, + _trackerObj: AddonsTracker, + _recordObj: AddonRecord, + version: 1, + + syncPriority: 5, + + _reconciler: null, + + async initialize() { + await SyncEngine.prototype.initialize.call(this); + await this._reconciler.ensureStateLoaded(); + }, + + /** + * Override parent method to find add-ons by their public ID, not Sync GUID. + */ + async _findDupe(item) { + let id = item.addonID; + + // The reconciler should have been updated at the top of the sync, so we + // can assume it is up to date when this function is called. + let addons = this._reconciler.addons; + if (!(id in addons)) { + return null; + } + + let addon = addons[id]; + if (addon.guid != item.id) { + return addon.guid; + } + + return null; + }, + + /** + * Override getChangedIDs to pull in tracker changes plus changes from the + * reconciler log. + */ + async getChangedIDs() { + let changes = {}; + const changedIDs = await this._tracker.getChangedIDs(); + for (let [id, modified] of Object.entries(changedIDs)) { + changes[id] = modified; + } + + let lastSync = await this.getLastSync(); + let lastSyncDate = new Date(lastSync * 1000); + + // The reconciler should have been refreshed at the beginning of a sync and + // we assume this function is only called from within a sync. + let reconcilerChanges = this._reconciler.getChangesSinceDate(lastSyncDate); + let addons = this._reconciler.addons; + for (let change of reconcilerChanges) { + let changeTime = change[0]; + let id = change[2]; + + if (!(id in addons)) { + continue; + } + + // Keep newest modified time. + if (id in changes && changeTime < changes[id]) { + continue; + } + + if (!(await this.isAddonSyncable(addons[id]))) { + continue; + } + + this._log.debug("Adding changed add-on from changes log: " + id); + let addon = addons[id]; + changes[addon.guid] = changeTime.getTime() / 1000; + } + + return changes; + }, + + /** + * Override start of sync function to refresh reconciler. + * + * Many functions in this class assume the reconciler is refreshed at the + * top of a sync. If this ever changes, those functions should be revisited. + * + * Technically speaking, we don't need to refresh the reconciler on every + * sync since it is installed as an AddonManager listener. However, add-ons + * are complicated and we force a full refresh, just in case the listeners + * missed something. + */ + async _syncStartup() { + // We refresh state before calling parent because syncStartup in the parent + // looks for changed IDs, which is dependent on add-on state being up to + // date. + await this._refreshReconcilerState(); + return SyncEngine.prototype._syncStartup.call(this); + }, + + /** + * Override end of sync to perform a little housekeeping on the reconciler. + * + * We prune changes to prevent the reconciler state from growing without + * bound. Even if it grows unbounded, there would have to be many add-on + * changes (thousands) for it to slow things down significantly. This is + * highly unlikely to occur. Still, we exercise defense just in case. + */ + async _syncCleanup() { + let lastSync = await this.getLastSync(); + let ms = 1000 * lastSync - PRUNE_ADDON_CHANGES_THRESHOLD; + this._reconciler.pruneChangesBeforeDate(new Date(ms)); + return SyncEngine.prototype._syncCleanup.call(this); + }, + + /** + * Helper function to ensure reconciler is up to date. + * + * This will load the reconciler's state from the file + * system (if needed) and refresh the state of the reconciler. + */ + async _refreshReconcilerState() { + this._log.debug("Refreshing reconciler state"); + return this._reconciler.refreshGlobalState(); + }, + + // Returns a promise + isAddonSyncable(addon, ignoreRepoCheck) { + return this._store.isAddonSyncable(addon, ignoreRepoCheck); + }, +}; +Object.setPrototypeOf(AddonsEngine.prototype, SyncEngine.prototype); + +/** + * This is the primary interface between Sync and the Addons Manager. + * + * In addition to the core store APIs, we provide convenience functions to wrap + * Add-on Manager APIs with Sync-specific semantics. + */ +function AddonsStore(name, engine) { + Store.call(this, name, engine); +} +AddonsStore.prototype = { + // Define the add-on types (.type) that we support. + _syncableTypes: ["extension", "theme"], + + _extensionsPrefs: new Preferences("extensions."), + + get reconciler() { + return this.engine._reconciler; + }, + + /** + * Override applyIncoming to filter out records we can't handle. + */ + async applyIncoming(record) { + // The fields we look at aren't present when the record is deleted. + if (!record.deleted) { + // Ignore records not belonging to our application ID because that is the + // current policy. + if (record.applicationID != Services.appinfo.ID) { + this._log.info( + "Ignoring incoming record from other App ID: " + record.id + ); + return; + } + + // Ignore records that aren't from the official add-on repository, as that + // is our current policy. + if (record.source != "amo") { + this._log.info( + "Ignoring unknown add-on source (" + + record.source + + ")" + + " for " + + record.id + ); + return; + } + } + + // Ignore incoming records for which an existing non-syncable addon + // exists. Note that we do not insist that the addon manager already have + // metadata for this addon - it's possible our reconciler previously saw the + // addon but the addon-manager cache no longer has it - which is fine for a + // new incoming addon. + // (Note that most other cases where the addon-manager cache is invalid + // doesn't get this treatment because that cache self-repairs after some + // time - but it only re-populates addons which are currently installed.) + let existingMeta = this.reconciler.addons[record.addonID]; + if ( + existingMeta && + !(await this.isAddonSyncable(existingMeta, /* ignoreRepoCheck */ true)) + ) { + this._log.info( + "Ignoring incoming record for an existing but non-syncable addon", + record.addonID + ); + return; + } + + await Store.prototype.applyIncoming.call(this, record); + }, + + /** + * Provides core Store API to create/install an add-on from a record. + */ + async create(record) { + // This will throw if there was an error. This will get caught by the sync + // engine and the record will try to be applied later. + const results = await AddonUtils.installAddons([ + { + id: record.addonID, + syncGUID: record.id, + enabled: record.enabled, + requireSecureURI: this._extensionsPrefs.get( + "install.requireSecureOrigin", + true + ), + }, + ]); + + if (results.skipped.includes(record.addonID)) { + this._log.info("Add-on skipped: " + record.addonID); + // Just early-return for skipped addons - we don't want to arrange to + // try again next time because the condition that caused up to skip + // will remain true for this addon forever. + return; + } + + let addon; + for (let a of results.addons) { + if (a.id == record.addonID) { + addon = a; + break; + } + } + + // This should never happen, but is present as a fail-safe. + if (!addon) { + throw new Error("Add-on not found after install: " + record.addonID); + } + + this._log.info("Add-on installed: " + record.addonID); + }, + + /** + * Provides core Store API to remove/uninstall an add-on from a record. + */ + async remove(record) { + // If this is called, the payload is empty, so we have to find by GUID. + let addon = await this.getAddonByGUID(record.id); + if (!addon) { + // We don't throw because if the add-on could not be found then we assume + // it has already been uninstalled and there is nothing for this function + // to do. + return; + } + + this._log.info("Uninstalling add-on: " + addon.id); + await AddonUtils.uninstallAddon(addon); + }, + + /** + * Provides core Store API to update an add-on from a record. + */ + async update(record) { + let addon = await this.getAddonByID(record.addonID); + + // update() is called if !this.itemExists. And, since itemExists consults + // the reconciler only, we need to take care of some corner cases. + // + // First, the reconciler could know about an add-on that was uninstalled + // and no longer present in the add-ons manager. + if (!addon) { + await this.create(record); + return; + } + + // It's also possible that the add-on is non-restartless and has pending + // install/uninstall activity. + // + // We wouldn't get here if the incoming record was for a deletion. So, + // check for pending uninstall and cancel if necessary. + if (addon.pendingOperations & lazy.AddonManager.PENDING_UNINSTALL) { + addon.cancelUninstall(); + + // We continue with processing because there could be state or ID change. + } + + await this.updateUserDisabled(addon, !record.enabled); + }, + + /** + * Provide core Store API to determine if a record exists. + */ + async itemExists(guid) { + let addon = this.reconciler.getAddonStateFromSyncGUID(guid); + + return !!addon; + }, + + /** + * Create an add-on record from its GUID. + * + * @param guid + * Add-on GUID (from extensions DB) + * @param collection + * Collection to add record to. + * + * @return AddonRecord instance + */ + async createRecord(guid, collection) { + let record = new AddonRecord(collection, guid); + record.applicationID = Services.appinfo.ID; + + let addon = this.reconciler.getAddonStateFromSyncGUID(guid); + + // If we don't know about this GUID or if it has been uninstalled, we mark + // the record as deleted. + if (!addon || !addon.installed) { + record.deleted = true; + return record; + } + + record.modified = addon.modified.getTime() / 1000; + + record.addonID = addon.id; + record.enabled = addon.enabled; + + // This needs to be dynamic when add-ons don't come from AddonRepository. + record.source = "amo"; + + return record; + }, + + /** + * Changes the id of an add-on. + * + * This implements a core API of the store. + */ + async changeItemID(oldID, newID) { + // We always update the GUID in the reconciler because it will be + // referenced later in the sync process. + let state = this.reconciler.getAddonStateFromSyncGUID(oldID); + if (state) { + state.guid = newID; + await this.reconciler.saveState(); + } + + let addon = await this.getAddonByGUID(oldID); + if (!addon) { + this._log.debug( + "Cannot change item ID (" + + oldID + + ") in Add-on " + + "Manager because old add-on not present: " + + oldID + ); + return; + } + + addon.syncGUID = newID; + }, + + /** + * Obtain the set of all syncable add-on Sync GUIDs. + * + * This implements a core Store API. + */ + async getAllIDs() { + let ids = {}; + + let addons = this.reconciler.addons; + for (let id in addons) { + let addon = addons[id]; + if (await this.isAddonSyncable(addon)) { + ids[addon.guid] = true; + } + } + + return ids; + }, + + /** + * Wipe engine data. + * + * This uninstalls all syncable addons from the application. In case of + * error, it logs the error and keeps trying with other add-ons. + */ + async wipe() { + this._log.info("Processing wipe."); + + await this.engine._refreshReconcilerState(); + + // We only wipe syncable add-ons. Wipe is a Sync feature not a security + // feature. + let ids = await this.getAllIDs(); + for (let guid in ids) { + let addon = await this.getAddonByGUID(guid); + if (!addon) { + this._log.debug( + "Ignoring add-on because it couldn't be obtained: " + guid + ); + continue; + } + + this._log.info("Uninstalling add-on as part of wipe: " + addon.id); + await Utils.catch.call(this, () => addon.uninstall())(); + } + }, + + /** ************************************************************************* + * Functions below are unique to this store and not part of the Store API * + ***************************************************************************/ + + /** + * Obtain an add-on from its public ID. + * + * @param id + * Add-on ID + * @return Addon or undefined if not found + */ + async getAddonByID(id) { + return lazy.AddonManager.getAddonByID(id); + }, + + /** + * Obtain an add-on from its Sync GUID. + * + * @param guid + * Add-on Sync GUID + * @return DBAddonInternal or null + */ + async getAddonByGUID(guid) { + return lazy.AddonManager.getAddonBySyncGUID(guid); + }, + + /** + * Determines whether an add-on is suitable for Sync. + * + * @param addon + * Addon instance + * @param ignoreRepoCheck + * Should we skip checking the Addons repository (primarially useful + * for testing and validation). + * @return Boolean indicating whether it is appropriate for Sync + */ + async isAddonSyncable(addon, ignoreRepoCheck = false) { + // Currently, we limit syncable add-ons to those that are: + // 1) In a well-defined set of types + // 2) Installed in the current profile + // 3) Not installed by a foreign entity (i.e. installed by the app) + // since they act like global extensions. + // 4) Is not a hotfix. + // 5) The addons XPIProvider doesn't veto it (i.e not being installed in + // the profile directory, or any other reasons it says the addon can't + // be synced) + // 6) Are installed from AMO + + // We could represent the test as a complex boolean expression. We go the + // verbose route so the failure reason is logged. + if (!addon) { + this._log.debug("Null object passed to isAddonSyncable."); + return false; + } + + if (!this._syncableTypes.includes(addon.type)) { + this._log.debug( + addon.id + " not syncable: type not in allowed list: " + addon.type + ); + return false; + } + + if (!(addon.scope & lazy.AddonManager.SCOPE_PROFILE)) { + this._log.debug(addon.id + " not syncable: not installed in profile."); + return false; + } + + // If the addon manager says it's not syncable, we skip it. + if (!addon.isSyncable) { + this._log.debug(addon.id + " not syncable: vetoed by the addon manager."); + return false; + } + + // This may be too aggressive. If an add-on is downloaded from AMO and + // manually placed in the profile directory, foreignInstall will be set. + // Arguably, that add-on should be syncable. + // TODO Address the edge case and come up with more robust heuristics. + if (addon.foreignInstall) { + this._log.debug(addon.id + " not syncable: is foreign install."); + return false; + } + + // If the AddonRepository's cache isn't enabled (which it typically isn't + // in tests), getCachedAddonByID always returns null - so skip the check + // in that case. We also provide a way to specifically opt-out of the check + // even if the cache is enabled, which is used by the validators. + if (ignoreRepoCheck || !lazy.AddonRepository.cacheEnabled) { + return true; + } + + let result = await new Promise(res => { + lazy.AddonRepository.getCachedAddonByID(addon.id, res); + }); + + if (!result) { + this._log.debug( + addon.id + " not syncable: add-on not found in add-on repository." + ); + return false; + } + + return this.isSourceURITrusted(result.sourceURI); + }, + + /** + * Determine whether an add-on's sourceURI field is trusted and the add-on + * can be installed. + * + * This function should only ever be called from isAddonSyncable(). It is + * exposed as a separate function to make testing easier. + * + * @param uri + * nsIURI instance to validate + * @return bool + */ + isSourceURITrusted: function isSourceURITrusted(uri) { + // For security reasons, we currently limit synced add-ons to those + // installed from trusted hostname(s). We additionally require TLS with + // the add-ons site to help prevent forgeries. + let trustedHostnames = Svc.Prefs.get( + "addons.trustedSourceHostnames", + "" + ).split(","); + + if (!uri) { + this._log.debug("Undefined argument to isSourceURITrusted()."); + return false; + } + + // Scheme is validated before the hostname because uri.host may not be + // populated for certain schemes. It appears to always be populated for + // https, so we avoid the potential NS_ERROR_FAILURE on field access. + if (uri.scheme != "https") { + this._log.debug("Source URI not HTTPS: " + uri.spec); + return false; + } + + if (!trustedHostnames.includes(uri.host)) { + this._log.debug("Source hostname not trusted: " + uri.host); + return false; + } + + return true; + }, + + /** + * Update the userDisabled flag on an add-on. + * + * This will enable or disable an add-on. It has no return value and does + * not catch or handle exceptions thrown by the addon manager. If no action + * is needed it will return immediately. + * + * @param addon + * Addon instance to manipulate. + * @param value + * Boolean to which to set userDisabled on the passed Addon. + */ + async updateUserDisabled(addon, value) { + if (addon.userDisabled == value) { + return; + } + + // A pref allows changes to the enabled flag to be ignored. + if (Svc.Prefs.get("addons.ignoreUserEnabledChanges", false)) { + this._log.info( + "Ignoring enabled state change due to preference: " + addon.id + ); + return; + } + + AddonUtils.updateUserDisabled(addon, value); + // updating this flag doesn't send a notification for appDisabled addons, + // meaning the reconciler will not update its state and may resync the + // addon - so explicitly rectify the state (bug 1366994) + if (addon.appDisabled) { + await this.reconciler.rectifyStateFromAddon(addon); + } + }, +}; + +Object.setPrototypeOf(AddonsStore.prototype, Store.prototype); + +/** + * The add-ons tracker keeps track of real-time changes to add-ons. + * + * It hooks up to the reconciler and receives notifications directly from it. + */ +function AddonsTracker(name, engine) { + LegacyTracker.call(this, name, engine); +} +AddonsTracker.prototype = { + get reconciler() { + return this.engine._reconciler; + }, + + get store() { + return this.engine._store; + }, + + /** + * This callback is executed whenever the AddonsReconciler sends out a change + * notification. See AddonsReconciler.addChangeListener(). + */ + async changeListener(date, change, addon) { + this._log.debug("changeListener invoked: " + change + " " + addon.id); + // Ignore changes that occur during sync. + if (this.ignoreAll) { + return; + } + + if (!(await this.store.isAddonSyncable(addon))) { + this._log.debug( + "Ignoring change because add-on isn't syncable: " + addon.id + ); + return; + } + + const added = await this.addChangedID(addon.guid, date.getTime() / 1000); + if (added) { + this.score += SCORE_INCREMENT_XLARGE; + } + }, + + onStart() { + this.reconciler.startListening(); + this.reconciler.addChangeListener(this); + }, + + onStop() { + this.reconciler.removeChangeListener(this); + this.reconciler.stopListening(); + }, +}; + +Object.setPrototypeOf(AddonsTracker.prototype, LegacyTracker.prototype); + +export class AddonValidator extends CollectionValidator { + constructor(engine = null) { + super("addons", "id", ["addonID", "enabled", "applicationID", "source"]); + this.engine = engine; + } + + async getClientItems() { + return lazy.AddonManager.getAllAddons(); + } + + normalizeClientItem(item) { + let enabled = !item.userDisabled; + if (item.pendingOperations & lazy.AddonManager.PENDING_ENABLE) { + enabled = true; + } else if (item.pendingOperations & lazy.AddonManager.PENDING_DISABLE) { + enabled = false; + } + return { + enabled, + id: item.syncGUID, + addonID: item.id, + applicationID: Services.appinfo.ID, + source: "amo", // check item.foreignInstall? + original: item, + }; + } + + async normalizeServerItem(item) { + let guid = await this.engine._findDupe(item); + if (guid) { + item.id = guid; + } + return item; + } + + clientUnderstands(item) { + return item.applicationID === Services.appinfo.ID; + } + + async syncedByClient(item) { + return ( + !item.original.hidden && + !item.original.isSystem && + !( + item.original.pendingOperations & lazy.AddonManager.PENDING_UNINSTALL + ) && + // No need to await the returned promise explicitely: + // |expr1 && expr2| evaluates to expr2 if expr1 is true. + this.engine.isAddonSyncable(item.original, true) + ); + } +} diff --git a/services/sync/modules/engines/bookmarks.sys.mjs b/services/sync/modules/engines/bookmarks.sys.mjs new file mode 100644 index 0000000000..8724344084 --- /dev/null +++ b/services/sync/modules/engines/bookmarks.sys.mjs @@ -0,0 +1,953 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; + +import { Async } from "resource://services-common/async.sys.mjs"; + +import { SCORE_INCREMENT_XLARGE } from "resource://services-sync/constants.sys.mjs"; +import { + Changeset, + Store, + SyncEngine, + Tracker, +} from "resource://services-sync/engines.sys.mjs"; +import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + Observers: "resource://services-common/observers.sys.mjs", + PlacesBackups: "resource://gre/modules/PlacesBackups.sys.mjs", + PlacesDBUtils: "resource://gre/modules/PlacesDBUtils.sys.mjs", + PlacesSyncUtils: "resource://gre/modules/PlacesSyncUtils.sys.mjs", + PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", + Resource: "resource://services-sync/resource.sys.mjs", + SyncedBookmarksMirror: "resource://gre/modules/SyncedBookmarksMirror.sys.mjs", +}); + +const PLACES_MAINTENANCE_INTERVAL_SECONDS = 4 * 60 * 60; // 4 hours. + +const FOLDER_SORTINDEX = 1000000; + +// Roots that should be deleted from the server, instead of applied locally. +// This matches `AndroidBrowserBookmarksRepositorySession::forbiddenGUID`, +// but allows tags because we don't want to reparent tag folders or tag items +// to "unfiled". +const FORBIDDEN_INCOMING_IDS = ["pinned", "places", "readinglist"]; + +// Items with these parents should be deleted from the server. We allow +// children of the Places root, to avoid orphaning left pane queries and other +// descendants of custom roots. +const FORBIDDEN_INCOMING_PARENT_IDS = ["pinned", "readinglist"]; + +// The tracker ignores changes made by import and restore, to avoid bumping the +// score and triggering syncs during the process, as well as changes made by +// Sync. +XPCOMUtils.defineLazyGetter(lazy, "IGNORED_SOURCES", () => [ + lazy.PlacesUtils.bookmarks.SOURCES.SYNC, + lazy.PlacesUtils.bookmarks.SOURCES.IMPORT, + lazy.PlacesUtils.bookmarks.SOURCES.RESTORE, + lazy.PlacesUtils.bookmarks.SOURCES.RESTORE_ON_STARTUP, + lazy.PlacesUtils.bookmarks.SOURCES.SYNC_REPARENT_REMOVED_FOLDER_CHILDREN, +]); + +// The validation telemetry version for the engine. Version 1 is collected +// by `bookmark_validator.js`, and checks value as well as structure +// differences. Version 2 is collected by the engine as part of building the +// remote tree, and checks structure differences only. +const BOOKMARK_VALIDATOR_VERSION = 2; + +// The maximum time that the engine should wait before aborting a bookmark +// merge. +const BOOKMARK_APPLY_TIMEOUT_MS = 5 * 60 * 60 * 1000; // 5 minutes + +// The default frecency value to use when not known. +const FRECENCY_UNKNOWN = -1; + +// Returns the constructor for a bookmark record type. +function getTypeObject(type) { + switch (type) { + case "bookmark": + return Bookmark; + case "query": + return BookmarkQuery; + case "folder": + return BookmarkFolder; + case "livemark": + return Livemark; + case "separator": + return BookmarkSeparator; + case "item": + return PlacesItem; + } + return null; +} + +export function PlacesItem(collection, id, type) { + CryptoWrapper.call(this, collection, id); + this.type = type || "item"; +} + +PlacesItem.prototype = { + async decrypt(keyBundle) { + // Do the normal CryptoWrapper decrypt, but change types before returning + let clear = await CryptoWrapper.prototype.decrypt.call(this, keyBundle); + + // Convert the abstract places item to the actual object type + if (!this.deleted) { + Object.setPrototypeOf(this, this.getTypeObject(this.type).prototype); + } + + return clear; + }, + + getTypeObject: function PlacesItem_getTypeObject(type) { + let recordObj = getTypeObject(type); + if (!recordObj) { + throw new Error("Unknown places item object type: " + type); + } + return recordObj; + }, + + _logName: "Sync.Record.PlacesItem", + + // Converts the record to a Sync bookmark object that can be passed to + // `PlacesSyncUtils.bookmarks.{insert, update}`. + toSyncBookmark() { + let result = { + kind: this.type, + recordId: this.id, + parentRecordId: this.parentid, + }; + let dateAdded = lazy.PlacesSyncUtils.bookmarks.ratchetTimestampBackwards( + this.dateAdded, + +this.modified * 1000 + ); + if (dateAdded > 0) { + result.dateAdded = dateAdded; + } + return result; + }, + + // Populates the record from a Sync bookmark object returned from + // `PlacesSyncUtils.bookmarks.fetch`. + fromSyncBookmark(item) { + this.parentid = item.parentRecordId; + this.parentName = item.parentTitle; + if (item.dateAdded) { + this.dateAdded = item.dateAdded; + } + }, +}; + +Object.setPrototypeOf(PlacesItem.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(PlacesItem, "cleartext", [ + "hasDupe", + "parentid", + "parentName", + "type", + "dateAdded", +]); + +export function Bookmark(collection, id, type) { + PlacesItem.call(this, collection, id, type || "bookmark"); +} + +Bookmark.prototype = { + _logName: "Sync.Record.Bookmark", + + toSyncBookmark() { + let info = PlacesItem.prototype.toSyncBookmark.call(this); + info.title = this.title; + info.url = this.bmkUri; + info.description = this.description; + info.tags = this.tags; + info.keyword = this.keyword; + return info; + }, + + fromSyncBookmark(item) { + PlacesItem.prototype.fromSyncBookmark.call(this, item); + this.title = item.title; + this.bmkUri = item.url.href; + this.description = item.description; + this.tags = item.tags; + this.keyword = item.keyword; + }, +}; + +Object.setPrototypeOf(Bookmark.prototype, PlacesItem.prototype); + +Utils.deferGetSet(Bookmark, "cleartext", [ + "title", + "bmkUri", + "description", + "tags", + "keyword", +]); + +export function BookmarkQuery(collection, id) { + Bookmark.call(this, collection, id, "query"); +} + +BookmarkQuery.prototype = { + _logName: "Sync.Record.BookmarkQuery", + + toSyncBookmark() { + let info = Bookmark.prototype.toSyncBookmark.call(this); + info.folder = this.folderName || undefined; // empty string -> undefined + info.query = this.queryId; + return info; + }, + + fromSyncBookmark(item) { + Bookmark.prototype.fromSyncBookmark.call(this, item); + this.folderName = item.folder || undefined; // empty string -> undefined + this.queryId = item.query; + }, +}; + +Object.setPrototypeOf(BookmarkQuery.prototype, Bookmark.prototype); + +Utils.deferGetSet(BookmarkQuery, "cleartext", ["folderName", "queryId"]); + +export function BookmarkFolder(collection, id, type) { + PlacesItem.call(this, collection, id, type || "folder"); +} + +BookmarkFolder.prototype = { + _logName: "Sync.Record.Folder", + + toSyncBookmark() { + let info = PlacesItem.prototype.toSyncBookmark.call(this); + info.description = this.description; + info.title = this.title; + return info; + }, + + fromSyncBookmark(item) { + PlacesItem.prototype.fromSyncBookmark.call(this, item); + this.title = item.title; + this.description = item.description; + this.children = item.childRecordIds; + }, +}; + +Object.setPrototypeOf(BookmarkFolder.prototype, PlacesItem.prototype); + +Utils.deferGetSet(BookmarkFolder, "cleartext", [ + "description", + "title", + "children", +]); + +export function Livemark(collection, id) { + BookmarkFolder.call(this, collection, id, "livemark"); +} + +Livemark.prototype = { + _logName: "Sync.Record.Livemark", + + toSyncBookmark() { + let info = BookmarkFolder.prototype.toSyncBookmark.call(this); + info.feed = this.feedUri; + info.site = this.siteUri; + return info; + }, + + fromSyncBookmark(item) { + BookmarkFolder.prototype.fromSyncBookmark.call(this, item); + this.feedUri = item.feed.href; + if (item.site) { + this.siteUri = item.site.href; + } + }, +}; + +Object.setPrototypeOf(Livemark.prototype, BookmarkFolder.prototype); + +Utils.deferGetSet(Livemark, "cleartext", ["siteUri", "feedUri"]); + +export function BookmarkSeparator(collection, id) { + PlacesItem.call(this, collection, id, "separator"); +} + +BookmarkSeparator.prototype = { + _logName: "Sync.Record.Separator", + + fromSyncBookmark(item) { + PlacesItem.prototype.fromSyncBookmark.call(this, item); + this.pos = item.index; + }, +}; + +Object.setPrototypeOf(BookmarkSeparator.prototype, PlacesItem.prototype); + +Utils.deferGetSet(BookmarkSeparator, "cleartext", "pos"); + +/** + * The bookmarks engine uses a different store that stages downloaded bookmarks + * in a separate database, instead of writing directly to Places. The buffer + * handles reconciliation, so we stub out `_reconcile`, and wait to pull changes + * until we're ready to upload. + */ +export function BookmarksEngine(service) { + SyncEngine.call(this, "Bookmarks", service); +} + +BookmarksEngine.prototype = { + _recordObj: PlacesItem, + _trackerObj: BookmarksTracker, + _storeObj: BookmarksStore, + version: 2, + // Used to override the engine name in telemetry, so that we can distinguish + // this engine from the old, now removed non-buffered engine. + overrideTelemetryName: "bookmarks-buffered", + + // Needed to ensure we don't miss items when resuming a sync that failed or + // aborted early. + _defaultSort: "oldest", + + syncPriority: 4, + allowSkippedRecord: false, + + async _ensureCurrentSyncID(newSyncID) { + await lazy.PlacesSyncUtils.bookmarks.ensureCurrentSyncId(newSyncID); + let buf = await this._store.ensureOpenMirror(); + await buf.ensureCurrentSyncId(newSyncID); + }, + + async ensureCurrentSyncID(newSyncID) { + let shouldWipeRemote = + await lazy.PlacesSyncUtils.bookmarks.shouldWipeRemote(); + if (!shouldWipeRemote) { + this._log.debug( + "Checking if server sync ID ${newSyncID} matches existing", + { newSyncID } + ); + await this._ensureCurrentSyncID(newSyncID); + return newSyncID; + } + // We didn't take the new sync ID because we need to wipe the server + // and other clients after a restore. Send the command, wipe the + // server, and reset our sync ID to reupload everything. + this._log.debug( + "Ignoring server sync ID ${newSyncID} after restore; " + + "wiping server and resetting sync ID", + { newSyncID } + ); + await this.service.clientsEngine.sendCommand( + "wipeEngine", + [this.name], + null, + { reason: "bookmark-restore" } + ); + let assignedSyncID = await this.resetSyncID(); + return assignedSyncID; + }, + + async getSyncID() { + return lazy.PlacesSyncUtils.bookmarks.getSyncId(); + }, + + async resetSyncID() { + await this._deleteServerCollection(); + return this.resetLocalSyncID(); + }, + + async resetLocalSyncID() { + let newSyncID = await lazy.PlacesSyncUtils.bookmarks.resetSyncId(); + this._log.debug("Assigned new sync ID ${newSyncID}", { newSyncID }); + let buf = await this._store.ensureOpenMirror(); + await buf.ensureCurrentSyncId(newSyncID); + return newSyncID; + }, + + async getLastSync() { + let mirror = await this._store.ensureOpenMirror(); + return mirror.getCollectionHighWaterMark(); + }, + + async setLastSync(lastSync) { + let mirror = await this._store.ensureOpenMirror(); + await mirror.setCollectionLastModified(lastSync); + // Update the last sync time in Places so that reverting to the original + // bookmarks engine doesn't download records we've already applied. + await lazy.PlacesSyncUtils.bookmarks.setLastSync(lastSync); + }, + + async _syncStartup() { + await super._syncStartup(); + + try { + // For first syncs, back up the user's bookmarks. + let lastSync = await this.getLastSync(); + if (!lastSync) { + this._log.debug("Bookmarks backup starting"); + await lazy.PlacesBackups.create(null, true); + this._log.debug("Bookmarks backup done"); + } + } catch (ex) { + // Failure to create a backup is somewhat bad, but probably not bad + // enough to prevent syncing of bookmarks - so just log the error and + // continue. + this._log.warn( + "Error while backing up bookmarks, but continuing with sync", + ex + ); + } + }, + + async _sync() { + try { + await super._sync(); + if (this._ranMaintenanceOnLastSync) { + // If the last sync failed, we ran maintenance, and this sync succeeded, + // maintenance likely fixed the issue. + this._ranMaintenanceOnLastSync = false; + this.service.recordTelemetryEvent("maintenance", "fix", "bookmarks"); + } + } catch (ex) { + if ( + Async.isShutdownException(ex) || + ex.status > 0 || + ex.name == "InterruptedError" + ) { + // Don't run maintenance on shutdown or HTTP errors, or if we aborted + // the sync because the user changed their bookmarks during merging. + throw ex; + } + if (ex.name == "MergeConflictError") { + this._log.warn( + "Bookmark syncing ran into a merge conflict error...will retry later" + ); + return; + } + // Run Places maintenance periodically to try to recover from corruption + // that might have caused the sync to fail. We cap the interval because + // persistent failures likely indicate a problem that won't be fixed by + // running maintenance after every failed sync. + let elapsedSinceMaintenance = + Date.now() / 1000 - + Services.prefs.getIntPref("places.database.lastMaintenance", 0); + if (elapsedSinceMaintenance >= PLACES_MAINTENANCE_INTERVAL_SECONDS) { + this._log.error( + "Bookmark sync failed, ${elapsedSinceMaintenance}s " + + "elapsed since last run; running Places maintenance", + { elapsedSinceMaintenance } + ); + await lazy.PlacesDBUtils.maintenanceOnIdle(); + this._ranMaintenanceOnLastSync = true; + this.service.recordTelemetryEvent("maintenance", "run", "bookmarks"); + } else { + this._ranMaintenanceOnLastSync = false; + } + throw ex; + } + }, + + async _syncFinish() { + await SyncEngine.prototype._syncFinish.call(this); + await lazy.PlacesSyncUtils.bookmarks.ensureMobileQuery(); + }, + + async pullAllChanges() { + return this.pullNewChanges(); + }, + + async trackRemainingChanges() { + let changes = this._modified.changes; + await lazy.PlacesSyncUtils.bookmarks.pushChanges(changes); + }, + + _deleteId(id) { + this._noteDeletedId(id); + }, + + // The bookmarks engine rarely calls this method directly, except in tests or + // when handling a `reset{All, Engine}` command from another client. We + // usually reset local Sync metadata on a sync ID mismatch, which both engines + // override with logic that lives in Places and the mirror. + async _resetClient() { + await super._resetClient(); + await lazy.PlacesSyncUtils.bookmarks.reset(); + let buf = await this._store.ensureOpenMirror(); + await buf.reset(); + }, + + // Cleans up the Places root, reading list items (ignored in bug 762118, + // removed in bug 1155684), and pinned sites. + _shouldDeleteRemotely(incomingItem) { + return ( + FORBIDDEN_INCOMING_IDS.includes(incomingItem.id) || + FORBIDDEN_INCOMING_PARENT_IDS.includes(incomingItem.parentid) + ); + }, + + emptyChangeset() { + return new BookmarksChangeset(); + }, + + async _apply() { + let buf = await this._store.ensureOpenMirror(); + let watchdog = this._newWatchdog(); + watchdog.start(BOOKMARK_APPLY_TIMEOUT_MS); + + try { + let recordsToUpload = await buf.apply({ + remoteTimeSeconds: lazy.Resource.serverTime, + signal: watchdog.signal, + }); + this._modified.replace(recordsToUpload); + } finally { + watchdog.stop(); + if (watchdog.abortReason) { + this._log.warn(`Aborting bookmark merge: ${watchdog.abortReason}`); + } + } + }, + + async _processIncoming(newitems) { + await super._processIncoming(newitems); + await this._apply(); + }, + + async _reconcile(item) { + return true; + }, + + async _createRecord(id) { + let record = await this._doCreateRecord(id); + if (!record.deleted) { + // Set hasDupe on all (non-deleted) records since we don't use it and we + // want to minimize the risk of older clients corrupting records. Note + // that the SyncedBookmarksMirror sets it for all records that it created, + // but we would like to ensure that weakly uploaded records are marked as + // hasDupe as well. + record.hasDupe = true; + } + return record; + }, + + async _doCreateRecord(id) { + let change = this._modified.changes[id]; + if (!change) { + this._log.error( + "Creating record for item ${id} not in strong changeset", + { id } + ); + throw new TypeError("Can't create record for unchanged item"); + } + let record = this._recordFromCleartext(id, change.cleartext); + record.sortindex = await this._store._calculateIndex(record); + return record; + }, + + _recordFromCleartext(id, cleartext) { + let recordObj = getTypeObject(cleartext.type); + if (!recordObj) { + this._log.warn( + "Creating record for item ${id} with unknown type ${type}", + { id, type: cleartext.type } + ); + recordObj = PlacesItem; + } + let record = new recordObj(this.name, id); + record.cleartext = cleartext; + return record; + }, + + async pullChanges() { + return {}; + }, + + /** + * Writes successfully uploaded records back to the mirror, so that the + * mirror matches the server. We update the mirror before updating Places, + * which has implications for interrupted syncs. + * + * 1. Sync interrupted during upload; server doesn't support atomic uploads. + * We'll download and reapply everything that we uploaded before the + * interruption. All locally changed items retain their change counters. + * 2. Sync interrupted during upload; atomic uploads enabled. The server + * discards the batch. All changed local items retain their change + * counters, so the next sync resumes cleanly. + * 3. Sync interrupted during upload; outgoing records can't fit in a single + * batch. We'll download and reapply all records through the most recent + * committed batch. This is a variation of (1). + * 4. Sync interrupted after we update the mirror, but before cleanup. The + * mirror matches the server, but locally changed items retain their change + * counters. Reuploading them on the next sync should be idempotent, though + * unnecessary. If another client makes a conflicting remote change before + * we sync again, we may incorrectly prefer the local state. + * 5. Sync completes successfully. We'll update the mirror, and reset the + * change counters for all items. + */ + async _onRecordsWritten(succeeded, failed, serverModifiedTime) { + let records = []; + for (let id of succeeded) { + let change = this._modified.changes[id]; + if (!change) { + // TODO (Bug 1433178): Write weakly uploaded records back to the mirror. + this._log.info("Uploaded record not in strong changeset", id); + continue; + } + if (!change.synced) { + this._log.info("Record in strong changeset not uploaded", id); + continue; + } + let cleartext = change.cleartext; + if (!cleartext) { + this._log.error( + "Missing Sync record cleartext for ${id} in ${change}", + { id, change } + ); + throw new TypeError("Missing cleartext for uploaded Sync record"); + } + let record = this._recordFromCleartext(id, cleartext); + record.modified = serverModifiedTime; + records.push(record); + } + let buf = await this._store.ensureOpenMirror(); + await buf.store(records, { needsMerge: false }); + }, + + async finalize() { + await super.finalize(); + await this._store.finalize(); + }, +}; + +Object.setPrototypeOf(BookmarksEngine.prototype, SyncEngine.prototype); + +/** + * The bookmarks store delegates to the mirror for staging and applying + * records. Most `Store` methods intentionally remain abstract, so you can't use + * this store to create or update bookmarks in Places. All changes must go + * through the mirror, which takes care of merging and producing a valid tree. + */ +function BookmarksStore(name, engine) { + Store.call(this, name, engine); +} + +BookmarksStore.prototype = { + _openMirrorPromise: null, + + // For tests. + _batchChunkSize: 500, + + // Create a record starting from the weave id (places guid) + async createRecord(id, collection) { + let item = await lazy.PlacesSyncUtils.bookmarks.fetch(id); + if (!item) { + // deleted item + let record = new PlacesItem(collection, id); + record.deleted = true; + return record; + } + + let recordObj = getTypeObject(item.kind); + if (!recordObj) { + this._log.warn("Unknown item type, cannot serialize: " + item.kind); + recordObj = PlacesItem; + } + let record = new recordObj(collection, id); + record.fromSyncBookmark(item); + + record.sortindex = await this._calculateIndex(record); + + return record; + }, + + async _calculateIndex(record) { + // Ensure folders have a very high sort index so they're not synced last. + if (record.type == "folder") { + return FOLDER_SORTINDEX; + } + + // For anything directly under the toolbar, give it a boost of more than an + // unvisited bookmark + let index = 0; + if (record.parentid == "toolbar") { + index += 150; + } + + // Add in the bookmark's frecency if we have something. + if (record.bmkUri != null) { + let frecency = FRECENCY_UNKNOWN; + try { + frecency = await lazy.PlacesSyncUtils.history.fetchURLFrecency( + record.bmkUri + ); + } catch (ex) { + this._log.warn( + `Failed to fetch frecency for ${record.id}; assuming default`, + ex + ); + this._log.trace("Record {id} has invalid URL ${bmkUri}", record); + } + if (frecency != FRECENCY_UNKNOWN) { + index += frecency; + } + } + + return index; + }, + + async wipe() { + // Save a backup before clearing out all bookmarks. + await lazy.PlacesBackups.create(null, true); + await lazy.PlacesSyncUtils.bookmarks.wipe(); + }, + + ensureOpenMirror() { + if (!this._openMirrorPromise) { + this._openMirrorPromise = this._openMirror().catch(err => { + // We may have failed to open the mirror temporarily; for example, if + // the database is locked. Clear the promise so that subsequent + // `ensureOpenMirror` calls can try to open the mirror again. + this._openMirrorPromise = null; + throw err; + }); + } + return this._openMirrorPromise; + }, + + async _openMirror() { + let mirrorPath = PathUtils.join( + PathUtils.profileDir, + "weave", + "bookmarks.sqlite" + ); + await IOUtils.makeDirectory(PathUtils.parent(mirrorPath), { + createAncestors: true, + }); + + return lazy.SyncedBookmarksMirror.open({ + path: mirrorPath, + recordStepTelemetry: (name, took, counts) => { + lazy.Observers.notify( + "weave:engine:sync:step", + { + name, + took, + counts, + }, + this.name + ); + }, + recordValidationTelemetry: (took, checked, problems) => { + lazy.Observers.notify( + "weave:engine:validate:finish", + { + version: BOOKMARK_VALIDATOR_VERSION, + took, + checked, + problems, + }, + this.name + ); + }, + }); + }, + + async applyIncomingBatch(records, countTelemetry) { + let buf = await this.ensureOpenMirror(); + for (let chunk of lazy.PlacesUtils.chunkArray( + records, + this._batchChunkSize + )) { + await buf.store(chunk); + } + // Array of failed records. + return []; + }, + + async applyIncoming(record) { + let buf = await this.ensureOpenMirror(); + await buf.store([record]); + }, + + async finalize() { + if (!this._openMirrorPromise) { + return; + } + let buf = await this._openMirrorPromise; + await buf.finalize(); + }, +}; + +Object.setPrototypeOf(BookmarksStore.prototype, Store.prototype); + +// The bookmarks tracker is a special flower. Instead of listening for changes +// via observer notifications, it queries Places for the set of items that have +// changed since the last sync. Because it's a "pull-based" tracker, it ignores +// all concepts of "add a changed ID." However, it still registers an observer +// to bump the score, so that changed bookmarks are synced immediately. +function BookmarksTracker(name, engine) { + Tracker.call(this, name, engine); +} +BookmarksTracker.prototype = { + onStart() { + this._placesListener = new PlacesWeakCallbackWrapper( + this.handlePlacesEvents.bind(this) + ); + lazy.PlacesUtils.observers.addListener( + [ + "bookmark-added", + "bookmark-removed", + "bookmark-moved", + "bookmark-guid-changed", + "bookmark-keyword-changed", + "bookmark-tags-changed", + "bookmark-time-changed", + "bookmark-title-changed", + "bookmark-url-changed", + ], + this._placesListener + ); + Svc.Obs.add("bookmarks-restore-begin", this); + Svc.Obs.add("bookmarks-restore-success", this); + Svc.Obs.add("bookmarks-restore-failed", this); + }, + + onStop() { + lazy.PlacesUtils.observers.removeListener( + [ + "bookmark-added", + "bookmark-removed", + "bookmark-moved", + "bookmark-guid-changed", + "bookmark-keyword-changed", + "bookmark-tags-changed", + "bookmark-time-changed", + "bookmark-title-changed", + "bookmark-url-changed", + ], + this._placesListener + ); + Svc.Obs.remove("bookmarks-restore-begin", this); + Svc.Obs.remove("bookmarks-restore-success", this); + Svc.Obs.remove("bookmarks-restore-failed", this); + }, + + async getChangedIDs() { + return lazy.PlacesSyncUtils.bookmarks.pullChanges(); + }, + + observe(subject, topic, data) { + switch (topic) { + case "bookmarks-restore-begin": + this._log.debug("Ignoring changes from importing bookmarks."); + break; + case "bookmarks-restore-success": + this._log.debug("Tracking all items on successful import."); + + if (data == "json") { + this._log.debug( + "Restore succeeded: wiping server and other clients." + ); + // Trigger an immediate sync. `ensureCurrentSyncID` will notice we + // restored, wipe the server and other clients, reset the sync ID, and + // upload the restored tree. + this.score += SCORE_INCREMENT_XLARGE; + } else { + // "html", "html-initial", or "json-append" + this._log.debug("Import succeeded."); + } + break; + case "bookmarks-restore-failed": + this._log.debug("Tracking all items on failed import."); + break; + } + }, + + QueryInterface: ChromeUtils.generateQI(["nsISupportsWeakReference"]), + + /* Every add/remove/change will trigger a sync for MULTI_DEVICE */ + _upScore: function BMT__upScore() { + this.score += SCORE_INCREMENT_XLARGE; + }, + + handlePlacesEvents(events) { + for (let event of events) { + switch (event.type) { + case "bookmark-added": + case "bookmark-removed": + case "bookmark-moved": + case "bookmark-keyword-changed": + case "bookmark-tags-changed": + case "bookmark-time-changed": + case "bookmark-title-changed": + case "bookmark-url-changed": + if (lazy.IGNORED_SOURCES.includes(event.source)) { + continue; + } + + this._log.trace(`'${event.type}': ${event.id}`); + this._upScore(); + break; + case "bookmark-guid-changed": + if (event.source !== lazy.PlacesUtils.bookmarks.SOURCES.SYNC) { + this._log.warn( + "The source of bookmark-guid-changed event shoud be sync." + ); + continue; + } + + this._log.trace(`'${event.type}': ${event.id}`); + this._upScore(); + break; + case "purge-caches": + this._log.trace("purge-caches"); + this._upScore(); + break; + } + } + }, +}; + +Object.setPrototypeOf(BookmarksTracker.prototype, Tracker.prototype); + +/** + * A changeset that stores extra metadata in a change record for each ID. The + * engine updates this metadata when uploading Sync records, and writes it back + * to Places in `BookmarksEngine#trackRemainingChanges`. + * + * The `synced` property on a change record means its corresponding item has + * been uploaded, and we should pretend it doesn't exist in the changeset. + */ +class BookmarksChangeset extends Changeset { + // Only `_reconcile` calls `getModifiedTimestamp` and `has`, and the engine + // does its own reconciliation. + getModifiedTimestamp(id) { + throw new Error("Don't use timestamps to resolve bookmark conflicts"); + } + + has(id) { + throw new Error("Don't use the changeset to resolve bookmark conflicts"); + } + + delete(id) { + let change = this.changes[id]; + if (change) { + // Mark the change as synced without removing it from the set. We do this + // so that we can update Places in `trackRemainingChanges`. + change.synced = true; + } + } + + ids() { + let results = new Set(); + for (let id in this.changes) { + if (!this.changes[id].synced) { + results.add(id); + } + } + return [...results]; + } +} diff --git a/services/sync/modules/engines/clients.sys.mjs b/services/sync/modules/engines/clients.sys.mjs new file mode 100644 index 0000000000..f3cebc8a54 --- /dev/null +++ b/services/sync/modules/engines/clients.sys.mjs @@ -0,0 +1,1123 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * How does the clients engine work? + * + * - We use 2 files - commands.json and commands-syncing.json. + * + * - At sync upload time, we attempt a rename of commands.json to + * commands-syncing.json, and ignore errors (helps for crash during sync!). + * - We load commands-syncing.json and stash the contents in + * _currentlySyncingCommands which lives for the duration of the upload process. + * - We use _currentlySyncingCommands to build the outgoing records + * - Immediately after successful upload, we delete commands-syncing.json from + * disk (and clear _currentlySyncingCommands). We reconcile our local records + * with what we just wrote in the server, and add failed IDs commands + * back in commands.json + * - Any time we need to "save" a command for future syncs, we load + * commands.json, update it, and write it back out. + */ + +import { Async } from "resource://services-common/async.sys.mjs"; + +import { + DEVICE_TYPE_DESKTOP, + DEVICE_TYPE_MOBILE, + SINGLE_USER_THRESHOLD, + SYNC_API_VERSION, +} from "resource://services-sync/constants.sys.mjs"; + +import { + Store, + SyncEngine, + LegacyTracker, +} from "resource://services-sync/engines.sys.mjs"; +import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; +import { Resource } from "resource://services-sync/resource.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; + +const lazy = {}; + +XPCOMUtils.defineLazyGetter(lazy, "fxAccounts", () => { + return ChromeUtils.importESModule( + "resource://gre/modules/FxAccounts.sys.mjs" + ).getFxAccountsSingleton(); +}); + +const { PREF_ACCOUNT_ROOT } = ChromeUtils.import( + "resource://gre/modules/FxAccountsCommon.js" +); + +const CLIENTS_TTL = 15552000; // 180 days +const CLIENTS_TTL_REFRESH = 604800; // 7 days +const STALE_CLIENT_REMOTE_AGE = 604800; // 7 days + +// TTL of the message sent to another device when sending a tab +const NOTIFY_TAB_SENT_TTL_SECS = 1 * 3600; // 1 hour + +// How often we force a refresh of the FxA device list. +const REFRESH_FXA_DEVICE_INTERVAL_MS = 2 * 60 * 60 * 1000; // 2 hours + +// Reasons behind sending collection_changed push notifications. +const COLLECTION_MODIFIED_REASON_SENDTAB = "sendtab"; +const COLLECTION_MODIFIED_REASON_FIRSTSYNC = "firstsync"; + +const SUPPORTED_PROTOCOL_VERSIONS = [SYNC_API_VERSION]; +const LAST_MODIFIED_ON_PROCESS_COMMAND_PREF = + "services.sync.clients.lastModifiedOnProcessCommands"; + +function hasDupeCommand(commands, action) { + if (!commands) { + return false; + } + return commands.some( + other => + other.command == action.command && + Utils.deepEquals(other.args, action.args) + ); +} + +export function ClientsRec(collection, id) { + CryptoWrapper.call(this, collection, id); +} + +ClientsRec.prototype = { + _logName: "Sync.Record.Clients", + ttl: CLIENTS_TTL, +}; +Object.setPrototypeOf(ClientsRec.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(ClientsRec, "cleartext", [ + "name", + "type", + "commands", + "version", + "protocols", + "formfactor", + "os", + "appPackage", + "application", + "device", + "fxaDeviceId", +]); + +export function ClientEngine(service) { + SyncEngine.call(this, "Clients", service); + + this.fxAccounts = lazy.fxAccounts; + this.addClientCommandQueue = Async.asyncQueueCaller(this._log); + Utils.defineLazyIDProperty(this, "localID", "services.sync.client.GUID"); +} + +ClientEngine.prototype = { + _storeObj: ClientStore, + _recordObj: ClientsRec, + _trackerObj: ClientsTracker, + allowSkippedRecord: false, + _knownStaleFxADeviceIds: null, + _lastDeviceCounts: null, + _lastFxaDeviceRefresh: 0, + + async initialize() { + // Reset the last sync timestamp on every startup so that we fetch all clients + await this.resetLastSync(); + }, + + // These two properties allow us to avoid replaying the same commands + // continuously if we cannot manage to upload our own record. + _localClientLastModified: 0, + get _lastModifiedOnProcessCommands() { + return Services.prefs.getIntPref(LAST_MODIFIED_ON_PROCESS_COMMAND_PREF, -1); + }, + + set _lastModifiedOnProcessCommands(value) { + Services.prefs.setIntPref(LAST_MODIFIED_ON_PROCESS_COMMAND_PREF, value); + }, + + get isFirstSync() { + return !this.lastRecordUpload; + }, + + // Always sync client data as it controls other sync behavior + get enabled() { + return true; + }, + + get lastRecordUpload() { + return Svc.Prefs.get(this.name + ".lastRecordUpload", 0); + }, + set lastRecordUpload(value) { + Svc.Prefs.set(this.name + ".lastRecordUpload", Math.floor(value)); + }, + + get remoteClients() { + // return all non-stale clients for external consumption. + return Object.values(this._store._remoteClients).filter(v => !v.stale); + }, + + remoteClient(id) { + let client = this._store._remoteClients[id]; + return client && !client.stale ? client : null; + }, + + remoteClientExists(id) { + return !!this.remoteClient(id); + }, + + // Aggregate some stats on the composition of clients on this account + get stats() { + let stats = { + hasMobile: this.localType == DEVICE_TYPE_MOBILE, + names: [this.localName], + numClients: 1, + }; + + for (let id in this._store._remoteClients) { + let { name, type, stale } = this._store._remoteClients[id]; + if (!stale) { + stats.hasMobile = stats.hasMobile || type == DEVICE_TYPE_MOBILE; + stats.names.push(name); + stats.numClients++; + } + } + + return stats; + }, + + /** + * Obtain information about device types. + * + * Returns a Map of device types to integer counts. Guaranteed to include + * "desktop" (which will have at least 1 - this device) and "mobile" (which + * may have zero) counts. It almost certainly will include only these 2. + */ + get deviceTypes() { + let counts = new Map(); + + counts.set(this.localType, 1); // currently this must be DEVICE_TYPE_DESKTOP + counts.set(DEVICE_TYPE_MOBILE, 0); + + for (let id in this._store._remoteClients) { + let record = this._store._remoteClients[id]; + if (record.stale) { + continue; // pretend "stale" records don't exist. + } + let type = record.type; + if (!counts.has(type)) { + counts.set(type, 0); + } + + counts.set(type, counts.get(type) + 1); + } + + return counts; + }, + + get brandName() { + let brand = Services.strings.createBundle( + "chrome://branding/locale/brand.properties" + ); + return brand.GetStringFromName("brandShortName"); + }, + + get localName() { + return this.fxAccounts.device.getLocalName(); + }, + set localName(value) { + this.fxAccounts.device.setLocalName(value); + }, + + get localType() { + return this.fxAccounts.device.getLocalType(); + }, + + getClientName(id) { + if (id == this.localID) { + return this.localName; + } + let client = this._store._remoteClients[id]; + if (!client) { + return ""; + } + // Sometimes the sync clients don't always correctly update the device name + // However FxA always does, so try to pull the name from there first + let fxaDevice = this.fxAccounts.device.recentDeviceList?.find( + device => device.id === client.fxaDeviceId + ); + + // should be very rare, but could happen if we have yet to fetch devices, + // or the client recently disconnected + if (!fxaDevice) { + this._log.warn( + "Couldn't find associated FxA device, falling back to client name" + ); + return client.name; + } + return fxaDevice.name; + }, + + getClientFxaDeviceId(id) { + if (this._store._remoteClients[id]) { + return this._store._remoteClients[id].fxaDeviceId; + } + return null; + }, + + getClientByFxaDeviceId(fxaDeviceId) { + for (let id in this._store._remoteClients) { + let client = this._store._remoteClients[id]; + if (client.stale) { + continue; + } + if (client.fxaDeviceId == fxaDeviceId) { + return client; + } + } + return null; + }, + + getClientType(id) { + const client = this._store._remoteClients[id]; + if (client.type == DEVICE_TYPE_DESKTOP) { + return "desktop"; + } + if (client.formfactor && client.formfactor.includes("tablet")) { + return "tablet"; + } + return "phone"; + }, + + async _readCommands() { + let commands = await Utils.jsonLoad("commands", this); + return commands || {}; + }, + + /** + * Low level function, do not use directly (use _addClientCommand instead). + */ + async _saveCommands(commands) { + try { + await Utils.jsonSave("commands", this, commands); + } catch (error) { + this._log.error("Failed to save JSON outgoing commands", error); + } + }, + + async _prepareCommandsForUpload() { + try { + await Utils.jsonMove("commands", "commands-syncing", this); + } catch (e) { + // Ignore errors + } + let commands = await Utils.jsonLoad("commands-syncing", this); + return commands || {}; + }, + + async _deleteUploadedCommands() { + delete this._currentlySyncingCommands; + try { + await Utils.jsonRemove("commands-syncing", this); + } catch (err) { + this._log.error("Failed to delete syncing-commands file", err); + } + }, + + // Gets commands for a client we are yet to write to the server. Doesn't + // include commands for that client which are already on the server. + // We should rename this! + async getClientCommands(clientId) { + const allCommands = await this._readCommands(); + return allCommands[clientId] || []; + }, + + async removeLocalCommand(command) { + // the implementation of this engine is such that adding a command to + // the local client is how commands are deleted! ¯\_(ツ)_/¯ + await this._addClientCommand(this.localID, command); + }, + + async _addClientCommand(clientId, command) { + this.addClientCommandQueue.enqueueCall(async () => { + try { + const localCommands = await this._readCommands(); + const localClientCommands = localCommands[clientId] || []; + const remoteClient = this._store._remoteClients[clientId]; + let remoteClientCommands = []; + if (remoteClient && remoteClient.commands) { + remoteClientCommands = remoteClient.commands; + } + const clientCommands = localClientCommands.concat(remoteClientCommands); + if (hasDupeCommand(clientCommands, command)) { + return false; + } + localCommands[clientId] = localClientCommands.concat(command); + await this._saveCommands(localCommands); + return true; + } catch (e) { + // Failing to save a command should not "break the queue" of pending operations. + this._log.error(e); + return false; + } + }); + + return this.addClientCommandQueue.promiseCallsComplete(); + }, + + async _removeClientCommands(clientId) { + const allCommands = await this._readCommands(); + delete allCommands[clientId]; + await this._saveCommands(allCommands); + }, + + async updateKnownStaleClients() { + this._log.debug("Updating the known stale clients"); + // _fetchFxADevices side effect updates this._knownStaleFxADeviceIds. + await this._fetchFxADevices(); + let localFxADeviceId = await lazy.fxAccounts.device.getLocalId(); + // Process newer records first, so that if we hit a record with a device ID + // we've seen before, we can mark it stale immediately. + let clientList = Object.values(this._store._remoteClients).sort( + (a, b) => b.serverLastModified - a.serverLastModified + ); + let seenDeviceIds = new Set([localFxADeviceId]); + for (let client of clientList) { + // Clients might not have an `fxaDeviceId` if they fail the FxA + // registration process. + if (!client.fxaDeviceId) { + continue; + } + if (this._knownStaleFxADeviceIds.includes(client.fxaDeviceId)) { + this._log.info( + `Hiding stale client ${client.id} - in known stale clients list` + ); + client.stale = true; + } else if (seenDeviceIds.has(client.fxaDeviceId)) { + this._log.info( + `Hiding stale client ${client.id}` + + ` - duplicate device id ${client.fxaDeviceId}` + ); + client.stale = true; + } else { + seenDeviceIds.add(client.fxaDeviceId); + } + } + }, + + async _fetchFxADevices() { + // We only force a refresh periodically to keep the load on the servers + // down, and because we expect FxA to have received a push message in + // most cases when the FxA device list would have changed. For this reason + // we still go ahead and check the stale list even if we didn't force a + // refresh. + let now = this.fxAccounts._internal.now(); // tests mock this .now() impl. + if (now - REFRESH_FXA_DEVICE_INTERVAL_MS > this._lastFxaDeviceRefresh) { + this._lastFxaDeviceRefresh = now; + try { + await this.fxAccounts.device.refreshDeviceList(); + } catch (e) { + this._log.error("Could not refresh the FxA device list", e); + } + } + + // We assume that clients not present in the FxA Device Manager list have been + // disconnected and so are stale + this._log.debug("Refreshing the known stale clients list"); + let localClients = Object.values(this._store._remoteClients) + .filter(client => client.fxaDeviceId) // iOS client records don't have fxaDeviceId + .map(client => client.fxaDeviceId); + const fxaClients = this.fxAccounts.device.recentDeviceList + ? this.fxAccounts.device.recentDeviceList.map(device => device.id) + : []; + this._knownStaleFxADeviceIds = Utils.arraySub(localClients, fxaClients); + }, + + async _syncStartup() { + // Reupload new client record periodically. + if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) { + await this._tracker.addChangedID(this.localID); + } + return SyncEngine.prototype._syncStartup.call(this); + }, + + async _processIncoming() { + // Fetch all records from the server. + await this.resetLastSync(); + this._incomingClients = {}; + try { + await SyncEngine.prototype._processIncoming.call(this); + // Update FxA Device list. + await this._fetchFxADevices(); + // Since clients are synced unconditionally, any records in the local store + // that don't exist on the server must be for disconnected clients. Remove + // them, so that we don't upload records with commands for clients that will + // never see them. We also do this to filter out stale clients from the + // tabs collection, since showing their list of tabs is confusing. + for (let id in this._store._remoteClients) { + if (!this._incomingClients[id]) { + this._log.info(`Removing local state for deleted client ${id}`); + await this._removeRemoteClient(id); + } + } + let localFxADeviceId = await lazy.fxAccounts.device.getLocalId(); + // Bug 1264498: Mobile clients don't remove themselves from the clients + // collection when the user disconnects Sync, so we mark as stale clients + // with the same name that haven't synced in over a week. + // (Note we can't simply delete them, or we re-apply them next sync - see + // bug 1287687) + this._localClientLastModified = Math.round( + this._incomingClients[this.localID] + ); + delete this._incomingClients[this.localID]; + let names = new Set([this.localName]); + let seenDeviceIds = new Set([localFxADeviceId]); + let idToLastModifiedList = Object.entries(this._incomingClients).sort( + (a, b) => b[1] - a[1] + ); + for (let [id, serverLastModified] of idToLastModifiedList) { + let record = this._store._remoteClients[id]; + // stash the server last-modified time on the record. + record.serverLastModified = serverLastModified; + if ( + record.fxaDeviceId && + this._knownStaleFxADeviceIds.includes(record.fxaDeviceId) + ) { + this._log.info( + `Hiding stale client ${id} - in known stale clients list` + ); + record.stale = true; + } + if (!names.has(record.name)) { + if (record.fxaDeviceId) { + seenDeviceIds.add(record.fxaDeviceId); + } + names.add(record.name); + continue; + } + let remoteAge = Resource.serverTime - this._incomingClients[id]; + if (remoteAge > STALE_CLIENT_REMOTE_AGE) { + this._log.info(`Hiding stale client ${id} with age ${remoteAge}`); + record.stale = true; + continue; + } + if (record.fxaDeviceId && seenDeviceIds.has(record.fxaDeviceId)) { + this._log.info( + `Hiding stale client ${record.id}` + + ` - duplicate device id ${record.fxaDeviceId}` + ); + record.stale = true; + } else if (record.fxaDeviceId) { + seenDeviceIds.add(record.fxaDeviceId); + } + } + } finally { + this._incomingClients = null; + } + }, + + async _uploadOutgoing() { + this._currentlySyncingCommands = await this._prepareCommandsForUpload(); + const clientWithPendingCommands = Object.keys( + this._currentlySyncingCommands + ); + for (let clientId of clientWithPendingCommands) { + if (this._store._remoteClients[clientId] || this.localID == clientId) { + this._modified.set(clientId, 0); + } + } + let updatedIDs = this._modified.ids(); + await SyncEngine.prototype._uploadOutgoing.call(this); + // Record the response time as the server time for each item we uploaded. + let lastSync = await this.getLastSync(); + for (let id of updatedIDs) { + if (id == this.localID) { + this.lastRecordUpload = lastSync; + } else { + this._store._remoteClients[id].serverLastModified = lastSync; + } + } + }, + + async _onRecordsWritten(succeeded, failed) { + // Reconcile the status of the local records with what we just wrote on the + // server + for (let id of succeeded) { + const commandChanges = this._currentlySyncingCommands[id]; + if (id == this.localID) { + if (this.isFirstSync) { + this._log.info( + "Uploaded our client record for the first time, notifying other clients." + ); + this._notifyClientRecordUploaded(); + } + if (this.localCommands) { + this.localCommands = this.localCommands.filter( + command => !hasDupeCommand(commandChanges, command) + ); + } + } else { + const clientRecord = this._store._remoteClients[id]; + if (!commandChanges || !clientRecord) { + // should be impossible, else we wouldn't have been writing it. + this._log.warn( + "No command/No record changes for a client we uploaded" + ); + continue; + } + // fixup the client record, so our copy of _remoteClients matches what we uploaded. + this._store._remoteClients[id] = await this._store.createRecord(id); + // we could do better and pass the reference to the record we just uploaded, + // but this will do for now + } + } + + // Re-add failed commands + for (let id of failed) { + const commandChanges = this._currentlySyncingCommands[id]; + if (!commandChanges) { + continue; + } + await this._addClientCommand(id, commandChanges); + } + + await this._deleteUploadedCommands(); + + // Notify other devices that their own client collection changed + const idsToNotify = succeeded.reduce((acc, id) => { + if (id == this.localID) { + return acc; + } + const fxaDeviceId = this.getClientFxaDeviceId(id); + return fxaDeviceId ? acc.concat(fxaDeviceId) : acc; + }, []); + if (idsToNotify.length) { + this._notifyOtherClientsModified(idsToNotify); + } + }, + + _notifyOtherClientsModified(ids) { + // We are not waiting on this promise on purpose. + this._notifyCollectionChanged( + ids, + NOTIFY_TAB_SENT_TTL_SECS, + COLLECTION_MODIFIED_REASON_SENDTAB + ); + }, + + _notifyClientRecordUploaded() { + // We are not waiting on this promise on purpose. + this._notifyCollectionChanged( + null, + 0, + COLLECTION_MODIFIED_REASON_FIRSTSYNC + ); + }, + + /** + * @param {?string[]} ids FxA Client IDs to notify. null means everyone else. + * @param {number} ttl TTL of the push notification. + * @param {string} reason Reason for sending this push notification. + */ + async _notifyCollectionChanged(ids, ttl, reason) { + const message = { + version: 1, + command: "sync:collection_changed", + data: { + collections: ["clients"], + reason, + }, + }; + let excludedIds = null; + if (!ids) { + const localFxADeviceId = await lazy.fxAccounts.device.getLocalId(); + excludedIds = [localFxADeviceId]; + } + try { + await this.fxAccounts.notifyDevices(ids, excludedIds, message, ttl); + } catch (e) { + this._log.error("Could not notify of changes in the collection", e); + } + }, + + async _syncFinish() { + // Record histograms for our device types, and also write them to a pref + // so non-histogram telemetry (eg, UITelemetry) and the sync scheduler + // has easy access to them, and so they are accurate even before we've + // successfully synced the first time after startup. + let deviceTypeCounts = this.deviceTypes; + for (let [deviceType, count] of deviceTypeCounts) { + let hid; + let prefName = this.name + ".devices."; + switch (deviceType) { + case DEVICE_TYPE_DESKTOP: + hid = "WEAVE_DEVICE_COUNT_DESKTOP"; + prefName += "desktop"; + break; + case DEVICE_TYPE_MOBILE: + hid = "WEAVE_DEVICE_COUNT_MOBILE"; + prefName += "mobile"; + break; + default: + this._log.warn( + `Unexpected deviceType "${deviceType}" recording device telemetry.` + ); + continue; + } + Services.telemetry.getHistogramById(hid).add(count); + // Optimization: only write the pref if it changed since our last sync. + if ( + this._lastDeviceCounts == null || + this._lastDeviceCounts.get(prefName) != count + ) { + Svc.Prefs.set(prefName, count); + } + } + this._lastDeviceCounts = deviceTypeCounts; + return SyncEngine.prototype._syncFinish.call(this); + }, + + async _reconcile(item) { + // Every incoming record is reconciled, so we use this to track the + // contents of the collection on the server. + this._incomingClients[item.id] = item.modified; + + if (!(await this._store.itemExists(item.id))) { + return true; + } + // Clients are synced unconditionally, so we'll always have new records. + // Unfortunately, this will cause the scheduler to use the immediate sync + // interval for the multi-device case, instead of the active interval. We + // work around this by updating the record during reconciliation, and + // returning false to indicate that the record doesn't need to be applied + // later. + await this._store.update(item); + return false; + }, + + // Treat reset the same as wiping for locally cached clients + async _resetClient() { + await this._wipeClient(); + }, + + async _wipeClient() { + await SyncEngine.prototype._resetClient.call(this); + this._knownStaleFxADeviceIds = null; + delete this.localCommands; + await this._store.wipe(); + try { + await Utils.jsonRemove("commands", this); + } catch (err) { + this._log.warn("Could not delete commands.json", err); + } + try { + await Utils.jsonRemove("commands-syncing", this); + } catch (err) { + this._log.warn("Could not delete commands-syncing.json", err); + } + }, + + async removeClientData() { + let res = this.service.resource(this.engineURL + "/" + this.localID); + await res.delete(); + }, + + // Override the default behavior to delete bad records from the server. + async handleHMACMismatch(item, mayRetry) { + this._log.debug("Handling HMAC mismatch for " + item.id); + + let base = await SyncEngine.prototype.handleHMACMismatch.call( + this, + item, + mayRetry + ); + if (base != SyncEngine.kRecoveryStrategy.error) { + return base; + } + + // It's a bad client record. Save it to be deleted at the end of the sync. + this._log.debug("Bad client record detected. Scheduling for deletion."); + await this._deleteId(item.id); + + // Neither try again nor error; we're going to delete it. + return SyncEngine.kRecoveryStrategy.ignore; + }, + + /** + * A hash of valid commands that the client knows about. The key is a command + * and the value is a hash containing information about the command such as + * number of arguments, description, and importance (lower importance numbers + * indicate higher importance. + */ + _commands: { + resetAll: { + args: 0, + importance: 0, + desc: "Clear temporary local data for all engines", + }, + resetEngine: { + args: 1, + importance: 0, + desc: "Clear temporary local data for engine", + }, + wipeEngine: { + args: 1, + importance: 0, + desc: "Delete all client data for engine", + }, + logout: { args: 0, importance: 0, desc: "Log out client" }, + }, + + /** + * Sends a command+args pair to a specific client. + * + * @param command Command string + * @param args Array of arguments/data for command + * @param clientId Client to send command to + */ + async _sendCommandToClient(command, args, clientId, telemetryExtra) { + this._log.trace("Sending " + command + " to " + clientId); + + let client = this._store._remoteClients[clientId]; + if (!client) { + throw new Error("Unknown remote client ID: '" + clientId + "'."); + } + if (client.stale) { + throw new Error("Stale remote client ID: '" + clientId + "'."); + } + + let action = { + command, + args, + // We send the flowID to the other client so *it* can report it in its + // telemetry - we record it in ours below. + flowID: telemetryExtra.flowID, + }; + + if (await this._addClientCommand(clientId, action)) { + this._log.trace(`Client ${clientId} got a new action`, [command, args]); + await this._tracker.addChangedID(clientId); + try { + telemetryExtra.deviceID = + this.service.identity.hashedDeviceID(clientId); + } catch (_) {} + + this.service.recordTelemetryEvent( + "sendcommand", + command, + undefined, + telemetryExtra + ); + } else { + this._log.trace(`Client ${clientId} got a duplicate action`, [ + command, + args, + ]); + } + }, + + /** + * Check if the local client has any remote commands and perform them. + * + * @return false to abort sync + */ + async processIncomingCommands() { + return this._notify("clients:process-commands", "", async function () { + if ( + !this.localCommands || + (this._lastModifiedOnProcessCommands == this._localClientLastModified && + !this.ignoreLastModifiedOnProcessCommands) + ) { + return true; + } + this._lastModifiedOnProcessCommands = this._localClientLastModified; + + const clearedCommands = await this._readCommands()[this.localID]; + const commands = this.localCommands.filter( + command => !hasDupeCommand(clearedCommands, command) + ); + let didRemoveCommand = false; + // Process each command in order. + for (let rawCommand of commands) { + let shouldRemoveCommand = true; // most commands are auto-removed. + let { command, args, flowID } = rawCommand; + this._log.debug("Processing command " + command, args); + + this.service.recordTelemetryEvent( + "processcommand", + command, + undefined, + { flowID } + ); + + let engines = [args[0]]; + switch (command) { + case "resetAll": + engines = null; + // Fallthrough + case "resetEngine": + await this.service.resetClient(engines); + break; + case "wipeEngine": + await this.service.wipeClient(engines); + break; + case "logout": + this.service.logout(); + return false; + default: + this._log.warn("Received an unknown command: " + command); + break; + } + // Add the command to the "cleared" commands list + if (shouldRemoveCommand) { + await this.removeLocalCommand(rawCommand); + didRemoveCommand = true; + } + } + if (didRemoveCommand) { + await this._tracker.addChangedID(this.localID); + } + + return true; + })(); + }, + + /** + * Validates and sends a command to a client or all clients. + * + * Calling this does not actually sync the command data to the server. If the + * client already has the command/args pair, it won't receive a duplicate + * command. + * This method is async since it writes the command to a file. + * + * @param command + * Command to invoke on remote clients + * @param args + * Array of arguments to give to the command + * @param clientId + * Client ID to send command to. If undefined, send to all remote + * clients. + * @param flowID + * A unique identifier used to track success for this operation across + * devices. + */ + async sendCommand(command, args, clientId = null, telemetryExtra = {}) { + let commandData = this._commands[command]; + // Don't send commands that we don't know about. + if (!commandData) { + this._log.error("Unknown command to send: " + command); + return; + } else if (!args || args.length != commandData.args) { + // Don't send a command with the wrong number of arguments. + this._log.error( + "Expected " + + commandData.args + + " args for '" + + command + + "', but got " + + args + ); + return; + } + + // We allocate a "flowID" here, so it is used for each client. + telemetryExtra = Object.assign({}, telemetryExtra); // don't clobber the caller's object + if (!telemetryExtra.flowID) { + telemetryExtra.flowID = Utils.makeGUID(); + } + + if (clientId) { + await this._sendCommandToClient(command, args, clientId, telemetryExtra); + } else { + for (let [id, record] of Object.entries(this._store._remoteClients)) { + if (!record.stale) { + await this._sendCommandToClient(command, args, id, telemetryExtra); + } + } + } + }, + + async _removeRemoteClient(id) { + delete this._store._remoteClients[id]; + await this._tracker.removeChangedID(id); + await this._removeClientCommands(id); + this._modified.delete(id); + }, +}; +Object.setPrototypeOf(ClientEngine.prototype, SyncEngine.prototype); + +function ClientStore(name, engine) { + Store.call(this, name, engine); +} +ClientStore.prototype = { + _remoteClients: {}, + + async create(record) { + await this.update(record); + }, + + async update(record) { + if (record.id == this.engine.localID) { + // Only grab commands from the server; local name/type always wins + this.engine.localCommands = record.commands; + } else { + this._remoteClients[record.id] = record.cleartext; + } + }, + + async createRecord(id, collection) { + let record = new ClientsRec(collection, id); + + const commandsChanges = this.engine._currentlySyncingCommands + ? this.engine._currentlySyncingCommands[id] + : []; + + // Package the individual components into a record for the local client + if (id == this.engine.localID) { + try { + record.fxaDeviceId = await this.engine.fxAccounts.device.getLocalId(); + } catch (error) { + this._log.warn("failed to get fxa device id", error); + } + record.name = this.engine.localName; + record.type = this.engine.localType; + record.version = Services.appinfo.version; + record.protocols = SUPPORTED_PROTOCOL_VERSIONS; + + // Substract the commands we recorded that we've already executed + if ( + commandsChanges && + commandsChanges.length && + this.engine.localCommands && + this.engine.localCommands.length + ) { + record.commands = this.engine.localCommands.filter( + command => !hasDupeCommand(commandsChanges, command) + ); + } + + // Optional fields. + record.os = Services.appinfo.OS; // "Darwin" + record.appPackage = Services.appinfo.ID; + record.application = this.engine.brandName; // "Nightly" + + // We can't compute these yet. + // record.device = ""; // Bug 1100723 + // record.formfactor = ""; // Bug 1100722 + } else { + record.cleartext = Object.assign({}, this._remoteClients[id]); + delete record.cleartext.serverLastModified; // serverLastModified is a local only attribute. + + // Add the commands we have to send + if (commandsChanges && commandsChanges.length) { + const recordCommands = record.cleartext.commands || []; + const newCommands = commandsChanges.filter( + command => !hasDupeCommand(recordCommands, command) + ); + record.cleartext.commands = recordCommands.concat(newCommands); + } + + if (record.cleartext.stale) { + // It's almost certainly a logic error for us to upload a record we + // consider stale, so make log noise, but still remove the flag. + this._log.error( + `Preparing to upload record ${id} that we consider stale` + ); + delete record.cleartext.stale; + } + } + if (record.commands) { + const maxPayloadSize = + this.engine.service.getMemcacheMaxRecordPayloadSize(); + let origOrder = new Map(record.commands.map((c, i) => [c, i])); + // we sort first by priority, and second by age (indicated by order in the + // original list) + let commands = record.commands.slice().sort((a, b) => { + let infoA = this.engine._commands[a.command]; + let infoB = this.engine._commands[b.command]; + // Treat unknown command types as highest priority, to allow us to add + // high priority commands in the future without worrying about clients + // removing them on each-other unnecessarially. + let importA = infoA ? infoA.importance : 0; + let importB = infoB ? infoB.importance : 0; + // Higher importantance numbers indicate that we care less, so they + // go to the end of the list where they'll be popped off. + let importDelta = importA - importB; + if (importDelta != 0) { + return importDelta; + } + let origIdxA = origOrder.get(a); + let origIdxB = origOrder.get(b); + // Within equivalent priorities, we put older entries near the end + // of the list, so that they are removed first. + return origIdxB - origIdxA; + }); + let truncatedCommands = Utils.tryFitItems(commands, maxPayloadSize); + if (truncatedCommands.length != record.commands.length) { + this._log.warn( + `Removing commands from client ${id} (from ${record.commands.length} to ${truncatedCommands.length})` + ); + // Restore original order. + record.commands = truncatedCommands.sort( + (a, b) => origOrder.get(a) - origOrder.get(b) + ); + } + } + return record; + }, + + async itemExists(id) { + return id in (await this.getAllIDs()); + }, + + async getAllIDs() { + let ids = {}; + ids[this.engine.localID] = true; + for (let id in this._remoteClients) { + ids[id] = true; + } + return ids; + }, + + async wipe() { + this._remoteClients = {}; + }, +}; +Object.setPrototypeOf(ClientStore.prototype, Store.prototype); + +function ClientsTracker(name, engine) { + LegacyTracker.call(this, name, engine); +} +ClientsTracker.prototype = { + _enabled: false, + + onStart() { + Svc.Obs.add("fxaccounts:new_device_id", this.asyncObserver); + Services.prefs.addObserver( + PREF_ACCOUNT_ROOT + "device.name", + this.asyncObserver + ); + }, + onStop() { + Services.prefs.removeObserver( + PREF_ACCOUNT_ROOT + "device.name", + this.asyncObserver + ); + Svc.Obs.remove("fxaccounts:new_device_id", this.asyncObserver); + }, + + async observe(subject, topic, data) { + switch (topic) { + case "nsPref:changed": + this._log.debug("client.name preference changed"); + // Fallthrough intended. + case "fxaccounts:new_device_id": + await this.addChangedID(this.engine.localID); + this.score += SINGLE_USER_THRESHOLD + 1; // ALWAYS SYNC NOW. + break; + } + }, +}; +Object.setPrototypeOf(ClientsTracker.prototype, LegacyTracker.prototype); diff --git a/services/sync/modules/engines/extension-storage.sys.mjs b/services/sync/modules/engines/extension-storage.sys.mjs new file mode 100644 index 0000000000..f0ec21811a --- /dev/null +++ b/services/sync/modules/engines/extension-storage.sys.mjs @@ -0,0 +1,303 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; + +import { + BridgedEngine, + BridgeWrapperXPCOM, + LogAdapter, +} from "resource://services-sync/bridged_engine.sys.mjs"; +import { SyncEngine, Tracker } from "resource://services-sync/engines.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + MULTI_DEVICE_THRESHOLD: "resource://services-sync/constants.sys.mjs", + Observers: "resource://services-common/observers.sys.mjs", + SCORE_INCREMENT_MEDIUM: "resource://services-sync/constants.sys.mjs", + Svc: "resource://services-sync/util.sys.mjs", + extensionStorageSync: "resource://gre/modules/ExtensionStorageSync.sys.mjs", + + extensionStorageSyncKinto: + "resource://gre/modules/ExtensionStorageSyncKinto.sys.mjs", +}); + +XPCOMUtils.defineLazyServiceGetter( + lazy, + "StorageSyncService", + "@mozilla.org/extensions/storage/sync;1", + "nsIInterfaceRequestor" +); + +const PREF_FORCE_ENABLE = "engine.extension-storage.force"; + +// A helper to indicate whether extension-storage is enabled - it's based on +// the "addons" pref. The same logic is shared between both engine impls. +function getEngineEnabled() { + // By default, we sync extension storage if we sync addons. This + // lets us simplify the UX since users probably don't consider + // "extension preferences" a separate category of syncing. + // However, we also respect engine.extension-storage.force, which + // can be set to true or false, if a power user wants to customize + // the behavior despite the lack of UI. + const forced = lazy.Svc.Prefs.get(PREF_FORCE_ENABLE, undefined); + if (forced !== undefined) { + return forced; + } + return lazy.Svc.Prefs.get("engine.addons", false); +} + +function setEngineEnabled(enabled) { + // This will be called by the engine manager when declined on another device. + // Things will go a bit pear-shaped if the engine manager tries to end up + // with 'addons' and 'extension-storage' in different states - however, this + // *can* happen given we support the `engine.extension-storage.force` + // preference. So if that pref exists, we set it to this value. If that pref + // doesn't exist, we just ignore it and hope that the 'addons' engine is also + // going to be set to the same state. + if (lazy.Svc.Prefs.has(PREF_FORCE_ENABLE)) { + lazy.Svc.Prefs.set(PREF_FORCE_ENABLE, enabled); + } +} + +// A "bridged engine" to our webext-storage component. +export function ExtensionStorageEngineBridge(service) { + this.component = lazy.StorageSyncService.getInterface( + Ci.mozIBridgedSyncEngine + ); + BridgedEngine.call(this, "Extension-Storage", service); + this._bridge = new BridgeWrapperXPCOM(this.component); + + let app_services_logger = Cc["@mozilla.org/appservices/logger;1"].getService( + Ci.mozIAppServicesLogger + ); + let logger_target = "app-services:webext_storage:sync"; + app_services_logger.register(logger_target, new LogAdapter(this._log)); +} + +ExtensionStorageEngineBridge.prototype = { + syncPriority: 10, + + // Used to override the engine name in telemetry, so that we can distinguish . + overrideTelemetryName: "rust-webext-storage", + + _notifyPendingChanges() { + return new Promise(resolve => { + this.component + .QueryInterface(Ci.mozISyncedExtensionStorageArea) + .fetchPendingSyncChanges({ + QueryInterface: ChromeUtils.generateQI([ + "mozIExtensionStorageListener", + "mozIExtensionStorageCallback", + ]), + onChanged: (extId, json) => { + try { + lazy.extensionStorageSync.notifyListeners( + extId, + JSON.parse(json) + ); + } catch (ex) { + this._log.warn( + `Error notifying change listeners for ${extId}`, + ex + ); + } + }, + handleSuccess: resolve, + handleError: (code, message) => { + this._log.warn( + "Error fetching pending synced changes", + message, + code + ); + resolve(); + }, + }); + }); + }, + + _takeMigrationInfo() { + return new Promise((resolve, reject) => { + this.component + .QueryInterface(Ci.mozIExtensionStorageArea) + .takeMigrationInfo({ + QueryInterface: ChromeUtils.generateQI([ + "mozIExtensionStorageCallback", + ]), + handleSuccess: result => { + resolve(result ? JSON.parse(result) : null); + }, + handleError: (code, message) => { + this._log.warn("Error fetching migration info", message, code); + // `takeMigrationInfo` doesn't actually perform the migration, + // just reads (and clears) any data stored in the DB from the + // previous migration. + // + // Any errors here are very likely occurring a good while + // after the migration ran, so we just warn and pretend + // nothing was there. + resolve(null); + }, + }); + }); + }, + + async _syncStartup() { + let result = await super._syncStartup(); + let info = await this._takeMigrationInfo(); + if (info) { + lazy.Observers.notify( + "weave:telemetry:migration", + info, + "webext-storage" + ); + } + return result; + }, + + async _processIncoming() { + await super._processIncoming(); + try { + await this._notifyPendingChanges(); + } catch (ex) { + // Failing to notify `storage.onChanged` observers is bad, but shouldn't + // interrupt syncing. + this._log.warn("Error notifying about synced changes", ex); + } + }, + + get enabled() { + return getEngineEnabled(); + }, + set enabled(enabled) { + setEngineEnabled(enabled); + }, +}; +Object.setPrototypeOf( + ExtensionStorageEngineBridge.prototype, + BridgedEngine.prototype +); + +/** + ***************************************************************************** + * + * Deprecated support for Kinto + * + ***************************************************************************** + */ + +/** + * The Engine that manages syncing for the web extension "storage" + * API, and in particular ext.storage.sync. + * + * ext.storage.sync is implemented using Kinto, so it has mechanisms + * for syncing that we do not need to integrate in the Firefox Sync + * framework, so this is something of a stub. + */ +export function ExtensionStorageEngineKinto(service) { + SyncEngine.call(this, "Extension-Storage", service); + XPCOMUtils.defineLazyPreferenceGetter( + this, + "_skipPercentageChance", + "services.sync.extension-storage.skipPercentageChance", + 0 + ); +} + +ExtensionStorageEngineKinto.prototype = { + _trackerObj: ExtensionStorageTracker, + // we don't need these since we implement our own sync logic + _storeObj: undefined, + _recordObj: undefined, + + syncPriority: 10, + allowSkippedRecord: false, + + async _sync() { + return lazy.extensionStorageSyncKinto.syncAll(); + }, + + get enabled() { + return getEngineEnabled(); + }, + // We only need the enabled setter for the edge-case where info/collections + // has `extension-storage` - which could happen if the pref to flip the new + // engine on was once set but no longer is. + set enabled(enabled) { + setEngineEnabled(enabled); + }, + + _wipeClient() { + return lazy.extensionStorageSyncKinto.clearAll(); + }, + + shouldSkipSync(syncReason) { + if (syncReason == "user" || syncReason == "startup") { + this._log.info( + `Not skipping extension storage sync: reason == ${syncReason}` + ); + // Always sync if a user clicks the button, or if we're starting up. + return false; + } + // Ensure this wouldn't cause a resync... + if (this._tracker.score >= lazy.MULTI_DEVICE_THRESHOLD) { + this._log.info( + "Not skipping extension storage sync: Would trigger resync anyway" + ); + return false; + } + + let probability = this._skipPercentageChance / 100.0; + // Math.random() returns a value in the interval [0, 1), so `>` is correct: + // if `probability` is 1 skip every time, and if it's 0, never skip. + let shouldSkip = probability > Math.random(); + + this._log.info( + `Skipping extension-storage sync with a chance of ${probability}: ${shouldSkip}` + ); + return shouldSkip; + }, +}; +Object.setPrototypeOf( + ExtensionStorageEngineKinto.prototype, + SyncEngine.prototype +); + +function ExtensionStorageTracker(name, engine) { + Tracker.call(this, name, engine); + this._ignoreAll = false; +} +ExtensionStorageTracker.prototype = { + get ignoreAll() { + return this._ignoreAll; + }, + + set ignoreAll(value) { + this._ignoreAll = value; + }, + + onStart() { + lazy.Svc.Obs.add("ext.storage.sync-changed", this.asyncObserver); + }, + + onStop() { + lazy.Svc.Obs.remove("ext.storage.sync-changed", this.asyncObserver); + }, + + async observe(subject, topic, data) { + if (this.ignoreAll) { + return; + } + + if (topic !== "ext.storage.sync-changed") { + return; + } + + // Single adds, removes and changes are not so important on their + // own, so let's just increment score a bit. + this.score += lazy.SCORE_INCREMENT_MEDIUM; + }, +}; +Object.setPrototypeOf(ExtensionStorageTracker.prototype, Tracker.prototype); diff --git a/services/sync/modules/engines/forms.sys.mjs b/services/sync/modules/engines/forms.sys.mjs new file mode 100644 index 0000000000..3516327659 --- /dev/null +++ b/services/sync/modules/engines/forms.sys.mjs @@ -0,0 +1,298 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { + Store, + SyncEngine, + LegacyTracker, +} from "resource://services-sync/engines.sys.mjs"; + +import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; + +import { SCORE_INCREMENT_MEDIUM } from "resource://services-sync/constants.sys.mjs"; +import { + CollectionProblemData, + CollectionValidator, +} from "resource://services-sync/collection_validator.sys.mjs"; + +import { Async } from "resource://services-common/async.sys.mjs"; +import { Log } from "resource://gre/modules/Log.sys.mjs"; + +const lazy = {}; +ChromeUtils.defineESModuleGetters(lazy, { + FormHistory: "resource://gre/modules/FormHistory.sys.mjs", +}); + +const FORMS_TTL = 3 * 365 * 24 * 60 * 60; // Three years in seconds. + +export function FormRec(collection, id) { + CryptoWrapper.call(this, collection, id); +} + +FormRec.prototype = { + _logName: "Sync.Record.Form", + ttl: FORMS_TTL, +}; +Object.setPrototypeOf(FormRec.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(FormRec, "cleartext", ["name", "value"]); + +var FormWrapper = { + _log: Log.repository.getLogger("Sync.Engine.Forms"), + + _getEntryCols: ["fieldname", "value"], + _guidCols: ["guid"], + + _search(terms, searchData) { + return lazy.FormHistory.search(terms, searchData); + }, + + async _update(changes) { + if (!lazy.FormHistory.enabled) { + return; // update isn't going to do anything. + } + await lazy.FormHistory.update(changes).catch(console.error); + }, + + async getEntry(guid) { + let results = await this._search(this._getEntryCols, { guid }); + if (!results.length) { + return null; + } + return { name: results[0].fieldname, value: results[0].value }; + }, + + async getGUID(name, value) { + // Query for the provided entry. + let query = { fieldname: name, value }; + let results = await this._search(this._guidCols, query); + return results.length ? results[0].guid : null; + }, + + async hasGUID(guid) { + // We could probably use a count function here, but search exists... + let results = await this._search(this._guidCols, { guid }); + return !!results.length; + }, + + async replaceGUID(oldGUID, newGUID) { + let changes = { + op: "update", + guid: oldGUID, + newGuid: newGUID, + }; + await this._update(changes); + }, +}; + +export function FormEngine(service) { + SyncEngine.call(this, "Forms", service); +} + +FormEngine.prototype = { + _storeObj: FormStore, + _trackerObj: FormTracker, + _recordObj: FormRec, + + syncPriority: 6, + + get prefName() { + return "history"; + }, + + async _findDupe(item) { + return FormWrapper.getGUID(item.name, item.value); + }, +}; +Object.setPrototypeOf(FormEngine.prototype, SyncEngine.prototype); + +function FormStore(name, engine) { + Store.call(this, name, engine); +} +FormStore.prototype = { + async _processChange(change) { + // If this._changes is defined, then we are applying a batch, so we + // can defer it. + if (this._changes) { + this._changes.push(change); + return; + } + + // Otherwise we must handle the change right now. + await FormWrapper._update(change); + }, + + async applyIncomingBatch(records, countTelemetry) { + Async.checkAppReady(); + // We collect all the changes to be made then apply them all at once. + this._changes = []; + let failures = await Store.prototype.applyIncomingBatch.call( + this, + records, + countTelemetry + ); + if (this._changes.length) { + await FormWrapper._update(this._changes); + } + delete this._changes; + return failures; + }, + + async getAllIDs() { + let results = await FormWrapper._search(["guid"], []); + let guids = {}; + for (let result of results) { + guids[result.guid] = true; + } + return guids; + }, + + async changeItemID(oldID, newID) { + await FormWrapper.replaceGUID(oldID, newID); + }, + + async itemExists(id) { + return FormWrapper.hasGUID(id); + }, + + async createRecord(id, collection) { + let record = new FormRec(collection, id); + let entry = await FormWrapper.getEntry(id); + if (entry != null) { + record.name = entry.name; + record.value = entry.value; + } else { + record.deleted = true; + } + return record; + }, + + async create(record) { + this._log.trace("Adding form record for " + record.name); + let change = { + op: "add", + guid: record.id, + fieldname: record.name, + value: record.value, + }; + await this._processChange(change); + }, + + async remove(record) { + this._log.trace("Removing form record: " + record.id); + let change = { + op: "remove", + guid: record.id, + }; + await this._processChange(change); + }, + + async update(record) { + this._log.trace("Ignoring form record update request!"); + }, + + async wipe() { + let change = { + op: "remove", + }; + await FormWrapper._update(change); + }, +}; +Object.setPrototypeOf(FormStore.prototype, Store.prototype); + +function FormTracker(name, engine) { + LegacyTracker.call(this, name, engine); +} +FormTracker.prototype = { + QueryInterface: ChromeUtils.generateQI([ + "nsIObserver", + "nsISupportsWeakReference", + ]), + + onStart() { + Svc.Obs.add("satchel-storage-changed", this.asyncObserver); + }, + + onStop() { + Svc.Obs.remove("satchel-storage-changed", this.asyncObserver); + }, + + async observe(subject, topic, data) { + if (this.ignoreAll) { + return; + } + switch (topic) { + case "satchel-storage-changed": + if (data == "formhistory-add" || data == "formhistory-remove") { + let guid = subject.QueryInterface(Ci.nsISupportsString).toString(); + await this.trackEntry(guid); + } + break; + } + }, + + async trackEntry(guid) { + const added = await this.addChangedID(guid); + if (added) { + this.score += SCORE_INCREMENT_MEDIUM; + } + }, +}; +Object.setPrototypeOf(FormTracker.prototype, LegacyTracker.prototype); + +class FormsProblemData extends CollectionProblemData { + getSummary() { + // We don't support syncing deleted form data, so "clientMissing" isn't a problem + return super.getSummary().filter(entry => entry.name !== "clientMissing"); + } +} + +export class FormValidator extends CollectionValidator { + constructor() { + super("forms", "id", ["name", "value"]); + this.ignoresMissingClients = true; + } + + emptyProblemData() { + return new FormsProblemData(); + } + + async getClientItems() { + return FormWrapper._search(["guid", "fieldname", "value"], {}); + } + + normalizeClientItem(item) { + return { + id: item.guid, + guid: item.guid, + name: item.fieldname, + fieldname: item.fieldname, + value: item.value, + original: item, + }; + } + + async normalizeServerItem(item) { + let res = Object.assign( + { + guid: item.id, + fieldname: item.name, + original: item, + }, + item + ); + // Missing `name` or `value` causes the getGUID call to throw + if (item.name !== undefined && item.value !== undefined) { + let guid = await FormWrapper.getGUID(item.name, item.value); + if (guid) { + res.guid = guid; + res.id = guid; + res.duped = true; + } + } + + return res; + } +} diff --git a/services/sync/modules/engines/history.sys.mjs b/services/sync/modules/engines/history.sys.mjs new file mode 100644 index 0000000000..491e7c8a89 --- /dev/null +++ b/services/sync/modules/engines/history.sys.mjs @@ -0,0 +1,585 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +const HISTORY_TTL = 5184000; // 60 days in milliseconds +const THIRTY_DAYS_IN_MS = 2592000000; // 30 days in milliseconds + +import { Async } from "resource://services-common/async.sys.mjs"; +import { CommonUtils } from "resource://services-common/utils.sys.mjs"; + +import { + MAX_HISTORY_DOWNLOAD, + MAX_HISTORY_UPLOAD, + SCORE_INCREMENT_SMALL, + SCORE_INCREMENT_XLARGE, +} from "resource://services-sync/constants.sys.mjs"; + +import { + Store, + SyncEngine, + LegacyTracker, +} from "resource://services-sync/engines.sys.mjs"; +import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; +import { Utils } from "resource://services-sync/util.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + PlacesSyncUtils: "resource://gre/modules/PlacesSyncUtils.sys.mjs", + PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", +}); + +export function HistoryRec(collection, id) { + CryptoWrapper.call(this, collection, id); +} + +HistoryRec.prototype = { + _logName: "Sync.Record.History", + ttl: HISTORY_TTL, +}; +Object.setPrototypeOf(HistoryRec.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(HistoryRec, "cleartext", ["histUri", "title", "visits"]); + +export function HistoryEngine(service) { + SyncEngine.call(this, "History", service); +} + +HistoryEngine.prototype = { + _recordObj: HistoryRec, + _storeObj: HistoryStore, + _trackerObj: HistoryTracker, + downloadLimit: MAX_HISTORY_DOWNLOAD, + + syncPriority: 7, + + async getSyncID() { + return lazy.PlacesSyncUtils.history.getSyncId(); + }, + + async ensureCurrentSyncID(newSyncID) { + this._log.debug( + "Checking if server sync ID ${newSyncID} matches existing", + { newSyncID } + ); + await lazy.PlacesSyncUtils.history.ensureCurrentSyncId(newSyncID); + return newSyncID; + }, + + async resetSyncID() { + // First, delete the collection on the server. It's fine if we're + // interrupted here: on the next sync, we'll detect that our old sync ID is + // now stale, and start over as a first sync. + await this._deleteServerCollection(); + // Then, reset our local sync ID. + return this.resetLocalSyncID(); + }, + + async resetLocalSyncID() { + let newSyncID = await lazy.PlacesSyncUtils.history.resetSyncId(); + this._log.debug("Assigned new sync ID ${newSyncID}", { newSyncID }); + return newSyncID; + }, + + async getLastSync() { + let lastSync = await lazy.PlacesSyncUtils.history.getLastSync(); + return lastSync; + }, + + async setLastSync(lastSync) { + await lazy.PlacesSyncUtils.history.setLastSync(lastSync); + }, + + shouldSyncURL(url) { + return !url.startsWith("file:"); + }, + + async pullNewChanges() { + const changedIDs = await this._tracker.getChangedIDs(); + let modifiedGUIDs = Object.keys(changedIDs); + if (!modifiedGUIDs.length) { + return {}; + } + + let guidsToRemove = + await lazy.PlacesSyncUtils.history.determineNonSyncableGuids( + modifiedGUIDs + ); + await this._tracker.removeChangedID(...guidsToRemove); + return changedIDs; + }, + + async _resetClient() { + await super._resetClient(); + await lazy.PlacesSyncUtils.history.reset(); + }, +}; +Object.setPrototypeOf(HistoryEngine.prototype, SyncEngine.prototype); + +function HistoryStore(name, engine) { + Store.call(this, name, engine); +} + +HistoryStore.prototype = { + // We try and only update this many visits at one time. + MAX_VISITS_PER_INSERT: 500, + + // Some helper functions to handle GUIDs + async setGUID(uri, guid) { + if (!guid) { + guid = Utils.makeGUID(); + } + + try { + await lazy.PlacesSyncUtils.history.changeGuid(uri, guid); + } catch (e) { + this._log.error("Error setting GUID ${guid} for URI ${uri}", guid, uri); + } + + return guid; + }, + + async GUIDForUri(uri, create) { + // Use the existing GUID if it exists + let guid; + try { + guid = await lazy.PlacesSyncUtils.history.fetchGuidForURL(uri); + } catch (e) { + this._log.error("Error fetching GUID for URL ${uri}", uri); + } + + // If the URI has an existing GUID, return it. + if (guid) { + return guid; + } + + // If the URI doesn't have a GUID and we were indicated to create one. + if (create) { + return this.setGUID(uri); + } + + // If the URI doesn't have a GUID and we didn't create one for it. + return null; + }, + + async changeItemID(oldID, newID) { + let info = await lazy.PlacesSyncUtils.history.fetchURLInfoForGuid(oldID); + if (!info) { + throw new Error(`Can't change ID for nonexistent history entry ${oldID}`); + } + this.setGUID(info.url, newID); + }, + + async getAllIDs() { + let urls = await lazy.PlacesSyncUtils.history.getAllURLs({ + since: new Date(Date.now() - THIRTY_DAYS_IN_MS), + limit: MAX_HISTORY_UPLOAD, + }); + + let urlsByGUID = {}; + for (let url of urls) { + if (!this.engine.shouldSyncURL(url)) { + continue; + } + let guid = await this.GUIDForUri(url, true); + urlsByGUID[guid] = url; + } + return urlsByGUID; + }, + + async applyIncomingBatch(records, countTelemetry) { + // Convert incoming records to mozIPlaceInfo objects which are applied as + // either history additions or removals. + let failed = []; + let toAdd = []; + let toRemove = []; + await Async.yieldingForEach(records, async record => { + if (record.deleted) { + toRemove.push(record); + } else { + try { + let pageInfo = await this._recordToPlaceInfo(record); + if (pageInfo) { + toAdd.push(pageInfo); + } + } catch (ex) { + if (Async.isShutdownException(ex)) { + throw ex; + } + this._log.error("Failed to create a place info", ex); + this._log.trace("The record that failed", record); + failed.push(record.id); + countTelemetry.addIncomingFailedReason(ex.message); + } + } + }); + if (toAdd.length || toRemove.length) { + if (toRemove.length) { + // PlacesUtils.history.remove takes an array of visits to remove, + // but the error semantics are tricky - a single "bad" entry will cause + // an exception before anything is removed. So we do remove them one at + // a time. + await Async.yieldingForEach(toRemove, async record => { + try { + await this.remove(record); + } catch (ex) { + if (Async.isShutdownException(ex)) { + throw ex; + } + this._log.error("Failed to delete a place info", ex); + this._log.trace("The record that failed", record); + failed.push(record.id); + countTelemetry.addIncomingFailedReason(ex.message); + } + }); + } + for (let chunk of this._generateChunks(toAdd)) { + // Per bug 1415560, we ignore any exceptions returned by insertMany + // as they are likely to be spurious. We do supply an onError handler + // and log the exceptions seen there as they are likely to be + // informative, but we still never abort the sync based on them. + try { + await lazy.PlacesUtils.history.insertMany( + chunk, + null, + failedVisit => { + this._log.info( + "Failed to insert a history record", + failedVisit.guid + ); + this._log.trace("The record that failed", failedVisit); + failed.push(failedVisit.guid); + } + ); + } catch (ex) { + this._log.info("Failed to insert history records", ex); + countTelemetry.addIncomingFailedReason(ex.message); + } + } + } + + return failed; + }, + + /** + * Returns a generator that splits records into sanely sized chunks suitable + * for passing to places to prevent places doing bad things at shutdown. + */ + *_generateChunks(records) { + // We chunk based on the number of *visits* inside each record. However, + // we do not split a single record into multiple records, because at some + // time in the future, we intend to ensure these records are ordered by + // lastModified, and advance the engine's timestamp as we process them, + // meaning we can resume exactly where we left off next sync - although + // currently that's not done, so we will retry the entire batch next sync + // if interrupted. + // ie, this means that if a single record has more than MAX_VISITS_PER_INSERT + // visits, we will call insertMany() with exactly 1 record, but with + // more than MAX_VISITS_PER_INSERT visits. + let curIndex = 0; + this._log.debug(`adding ${records.length} records to history`); + while (curIndex < records.length) { + Async.checkAppReady(); // may throw if we are shutting down. + let toAdd = []; // what we are going to insert. + let count = 0; // a counter which tells us when toAdd is full. + do { + let record = records[curIndex]; + curIndex += 1; + toAdd.push(record); + count += record.visits.length; + } while ( + curIndex < records.length && + count + records[curIndex].visits.length <= this.MAX_VISITS_PER_INSERT + ); + this._log.trace(`adding ${toAdd.length} items in this chunk`); + yield toAdd; + } + }, + + /* An internal helper to determine if we can add an entry to places. + Exists primarily so tests can override it. + */ + _canAddURI(uri) { + return lazy.PlacesUtils.history.canAddURI(uri); + }, + + /** + * Converts a Sync history record to a mozIPlaceInfo. + * + * Throws if an invalid record is encountered (invalid URI, etc.), + * returns a new PageInfo object if the record is to be applied, null + * otherwise (no visits to add, etc.), + */ + async _recordToPlaceInfo(record) { + // Sort out invalid URIs and ones Places just simply doesn't want. + record.url = lazy.PlacesUtils.normalizeToURLOrGUID(record.histUri); + record.uri = CommonUtils.makeURI(record.histUri); + + if (!Utils.checkGUID(record.id)) { + this._log.warn("Encountered record with invalid GUID: " + record.id); + return null; + } + record.guid = record.id; + + if ( + !this._canAddURI(record.uri) || + !this.engine.shouldSyncURL(record.uri.spec) + ) { + this._log.trace( + "Ignoring record " + + record.id + + " with URI " + + record.uri.spec + + ": can't add this URI." + ); + return null; + } + + // We dupe visits by date and type. So an incoming visit that has + // the same timestamp and type as a local one won't get applied. + // To avoid creating new objects, we rewrite the query result so we + // can simply check for containment below. + let curVisitsAsArray = []; + let curVisits = new Set(); + try { + curVisitsAsArray = await lazy.PlacesSyncUtils.history.fetchVisitsForURL( + record.histUri + ); + } catch (e) { + this._log.error( + "Error while fetching visits for URL ${record.histUri}", + record.histUri + ); + } + let oldestAllowed = + lazy.PlacesSyncUtils.bookmarks.EARLIEST_BOOKMARK_TIMESTAMP; + if (curVisitsAsArray.length == 20) { + let oldestVisit = curVisitsAsArray[curVisitsAsArray.length - 1]; + oldestAllowed = lazy.PlacesSyncUtils.history.clampVisitDate( + lazy.PlacesUtils.toDate(oldestVisit.date).getTime() + ); + } + + let i, k; + for (i = 0; i < curVisitsAsArray.length; i++) { + // Same logic as used in the loop below to generate visitKey. + let { date, type } = curVisitsAsArray[i]; + let dateObj = lazy.PlacesUtils.toDate(date); + let millis = lazy.PlacesSyncUtils.history + .clampVisitDate(dateObj) + .getTime(); + curVisits.add(`${millis},${type}`); + } + + // Walk through the visits, make sure we have sound data, and eliminate + // dupes. The latter is done by rewriting the array in-place. + for (i = 0, k = 0; i < record.visits.length; i++) { + let visit = (record.visits[k] = record.visits[i]); + + if ( + !visit.date || + typeof visit.date != "number" || + !Number.isInteger(visit.date) + ) { + this._log.warn( + "Encountered record with invalid visit date: " + visit.date + ); + continue; + } + + if ( + !visit.type || + !Object.values(lazy.PlacesUtils.history.TRANSITIONS).includes( + visit.type + ) + ) { + this._log.warn( + "Encountered record with invalid visit type: " + + visit.type + + "; ignoring." + ); + continue; + } + + // Dates need to be integers. Future and far past dates are clamped to the + // current date and earliest sensible date, respectively. + let originalVisitDate = lazy.PlacesUtils.toDate(Math.round(visit.date)); + visit.date = + lazy.PlacesSyncUtils.history.clampVisitDate(originalVisitDate); + + if (visit.date.getTime() < oldestAllowed) { + // Visit is older than the oldest visit we have, and we have so many + // visits for this uri that we hit our limit when inserting. + continue; + } + let visitKey = `${visit.date.getTime()},${visit.type}`; + if (curVisits.has(visitKey)) { + // Visit is a dupe, don't increment 'k' so the element will be + // overwritten. + continue; + } + + // Note the visit key, so that we don't add duplicate visits with + // clamped timestamps. + curVisits.add(visitKey); + + visit.transition = visit.type; + k += 1; + } + record.visits.length = k; // truncate array + + // No update if there aren't any visits to apply. + // History wants at least one visit. + // In any case, the only thing we could change would be the title + // and that shouldn't change without a visit. + if (!record.visits.length) { + this._log.trace( + "Ignoring record " + + record.id + + " with URI " + + record.uri.spec + + ": no visits to add." + ); + return null; + } + + // PageInfo is validated using validateItemProperties which does a shallow + // copy of the properties. Since record uses getters some of the properties + // are not copied over. Thus we create and return a new object. + let pageInfo = { + title: record.title, + url: record.url, + guid: record.guid, + visits: record.visits, + }; + + return pageInfo; + }, + + async remove(record) { + this._log.trace("Removing page: " + record.id); + let removed = await lazy.PlacesUtils.history.remove(record.id); + if (removed) { + this._log.trace("Removed page: " + record.id); + } else { + this._log.debug("Page already removed: " + record.id); + } + }, + + async itemExists(id) { + return !!(await lazy.PlacesSyncUtils.history.fetchURLInfoForGuid(id)); + }, + + async createRecord(id, collection) { + let foo = await lazy.PlacesSyncUtils.history.fetchURLInfoForGuid(id); + let record = new HistoryRec(collection, id); + if (foo) { + record.histUri = foo.url; + record.title = foo.title; + record.sortindex = foo.frecency; + try { + record.visits = await lazy.PlacesSyncUtils.history.fetchVisitsForURL( + record.histUri + ); + } catch (e) { + this._log.error( + "Error while fetching visits for URL ${record.histUri}", + record.histUri + ); + record.visits = []; + } + } else { + record.deleted = true; + } + + return record; + }, + + async wipe() { + return lazy.PlacesSyncUtils.history.wipe(); + }, +}; +Object.setPrototypeOf(HistoryStore.prototype, Store.prototype); + +function HistoryTracker(name, engine) { + LegacyTracker.call(this, name, engine); +} +HistoryTracker.prototype = { + onStart() { + this._log.info("Adding Places observer."); + this._placesObserver = new PlacesWeakCallbackWrapper( + this.handlePlacesEvents.bind(this) + ); + PlacesObservers.addListener( + ["page-visited", "history-cleared", "page-removed"], + this._placesObserver + ); + }, + + onStop() { + this._log.info("Removing Places observer."); + if (this._placesObserver) { + PlacesObservers.removeListener( + ["page-visited", "history-cleared", "page-removed"], + this._placesObserver + ); + } + }, + + QueryInterface: ChromeUtils.generateQI(["nsISupportsWeakReference"]), + + handlePlacesEvents(aEvents) { + this.asyncObserver.enqueueCall(() => this._handlePlacesEvents(aEvents)); + }, + + async _handlePlacesEvents(aEvents) { + if (this.ignoreAll) { + this._log.trace( + "ignoreAll: ignoring visits [" + + aEvents.map(v => v.guid).join(",") + + "]" + ); + return; + } + for (let event of aEvents) { + switch (event.type) { + case "page-visited": { + this._log.trace("'page-visited': " + event.url); + if ( + this.engine.shouldSyncURL(event.url) && + (await this.addChangedID(event.pageGuid)) + ) { + this.score += SCORE_INCREMENT_SMALL; + } + break; + } + case "history-cleared": { + this._log.trace("history-cleared"); + // Note that we're going to trigger a sync, but none of the cleared + // pages are tracked, so the deletions will not be propagated. + // See Bug 578694. + this.score += SCORE_INCREMENT_XLARGE; + break; + } + case "page-removed": { + if (event.reason === PlacesVisitRemoved.REASON_EXPIRED) { + return; + } + + this._log.trace( + "page-removed: " + event.url + ", reason " + event.reason + ); + const added = await this.addChangedID(event.pageGuid); + if (added) { + this.score += event.isRemovedFromStore + ? SCORE_INCREMENT_XLARGE + : SCORE_INCREMENT_SMALL; + } + break; + } + } + } + }, +}; +Object.setPrototypeOf(HistoryTracker.prototype, LegacyTracker.prototype); diff --git a/services/sync/modules/engines/passwords.sys.mjs b/services/sync/modules/engines/passwords.sys.mjs new file mode 100644 index 0000000000..8f24d7333a --- /dev/null +++ b/services/sync/modules/engines/passwords.sys.mjs @@ -0,0 +1,565 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { + Collection, + CryptoWrapper, +} from "resource://services-sync/record.sys.mjs"; + +import { SCORE_INCREMENT_XLARGE } from "resource://services-sync/constants.sys.mjs"; +import { CollectionValidator } from "resource://services-sync/collection_validator.sys.mjs"; +import { + Store, + SyncEngine, + LegacyTracker, +} from "resource://services-sync/engines.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; + +import { Async } from "resource://services-common/async.sys.mjs"; + +// These are valid fields the server could have for a logins record +// we mainly use this to detect if there are any unknownFields and +// store (but don't process) those fields to roundtrip them back +const VALID_LOGIN_FIELDS = [ + "id", + "displayOrigin", + "formSubmitURL", + "formActionOrigin", + "httpRealm", + "hostname", + "origin", + "password", + "passwordField", + "timeCreated", + "timeLastUsed", + "timePasswordChanged", + "timesUsed", + "username", + "usernameField", + "unknownFields", +]; + +const SYNCABLE_LOGIN_FIELDS = [ + // `nsILoginInfo` fields. + "hostname", + "formSubmitURL", + "httpRealm", + "username", + "password", + "usernameField", + "passwordField", + + // `nsILoginMetaInfo` fields. + "timeCreated", + "timePasswordChanged", +]; + +// Compares two logins to determine if their syncable fields changed. The login +// manager fires `modifyLogin` for changes to all fields, including ones we +// don't sync. In particular, `timeLastUsed` changes shouldn't mark the login +// for upload; otherwise, we might overwrite changed passwords before they're +// downloaded (bug 973166). +function isSyncableChange(oldLogin, newLogin) { + oldLogin.QueryInterface(Ci.nsILoginMetaInfo).QueryInterface(Ci.nsILoginInfo); + newLogin.QueryInterface(Ci.nsILoginMetaInfo).QueryInterface(Ci.nsILoginInfo); + for (let property of SYNCABLE_LOGIN_FIELDS) { + if (oldLogin[property] != newLogin[property]) { + return true; + } + } + return false; +} + +export function LoginRec(collection, id) { + CryptoWrapper.call(this, collection, id); +} + +LoginRec.prototype = { + _logName: "Sync.Record.Login", + + cleartextToString() { + let o = Object.assign({}, this.cleartext); + if (o.password) { + o.password = "X".repeat(o.password.length); + } + return JSON.stringify(o); + }, +}; +Object.setPrototypeOf(LoginRec.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(LoginRec, "cleartext", [ + "hostname", + "formSubmitURL", + "httpRealm", + "username", + "password", + "usernameField", + "passwordField", + "timeCreated", + "timePasswordChanged", +]); + +export function PasswordEngine(service) { + SyncEngine.call(this, "Passwords", service); +} + +PasswordEngine.prototype = { + _storeObj: PasswordStore, + _trackerObj: PasswordTracker, + _recordObj: LoginRec, + + syncPriority: 2, + + // Metadata for syncing is stored in the login manager + async ensureCurrentSyncID(newSyncID) { + return Services.logins.ensureCurrentSyncID(newSyncID); + }, + + async getLastSync() { + let legacyValue = await super.getLastSync(); + if (legacyValue) { + await this.setLastSync(legacyValue); + Svc.Prefs.reset(this.name + ".lastSync"); + this._log.debug( + `migrated timestamp of ${legacyValue} to the logins store` + ); + return legacyValue; + } + return Services.logins.getLastSync(); + }, + + async setLastSync(timestamp) { + await Services.logins.setLastSync(timestamp); + }, + + async _syncFinish() { + await SyncEngine.prototype._syncFinish.call(this); + + // Delete the Weave credentials from the server once. + if (!Svc.Prefs.get("deletePwdFxA", false)) { + try { + let ids = []; + for (let host of Utils.getSyncCredentialsHosts()) { + for (let info of Services.logins.findLogins(host, "", "")) { + ids.push(info.QueryInterface(Ci.nsILoginMetaInfo).guid); + } + } + if (ids.length) { + let coll = new Collection(this.engineURL, null, this.service); + coll.ids = ids; + let ret = await coll.delete(); + this._log.debug("Delete result: " + ret); + if (!ret.success && ret.status != 400) { + // A non-400 failure means try again next time. + return; + } + } else { + this._log.debug("Didn't find any passwords to delete"); + } + // If there were no ids to delete, or we succeeded, or got a 400, + // record success. + Svc.Prefs.set("deletePwdFxA", true); + Svc.Prefs.reset("deletePwd"); // The old prefname we previously used. + } catch (ex) { + if (Async.isShutdownException(ex)) { + throw ex; + } + this._log.debug("Password deletes failed", ex); + } + } + }, + + async _findDupe(item) { + let login = this._store._nsLoginInfoFromRecord(item); + if (!login) { + return null; + } + + let logins = Services.logins.findLogins( + login.origin, + login.formActionOrigin, + login.httpRealm + ); + + await Async.promiseYield(); // Yield back to main thread after synchronous operation. + + // Look for existing logins that match the origin, but ignore the password. + for (let local of logins) { + if (login.matches(local, true) && local instanceof Ci.nsILoginMetaInfo) { + return local.guid; + } + } + + return null; + }, + + async pullAllChanges() { + let changes = {}; + let ids = await this._store.getAllIDs(); + for (let [id, info] of Object.entries(ids)) { + changes[id] = info.timePasswordChanged / 1000; + } + return changes; + }, + + getValidator() { + return new PasswordValidator(); + }, +}; +Object.setPrototypeOf(PasswordEngine.prototype, SyncEngine.prototype); + +function PasswordStore(name, engine) { + Store.call(this, name, engine); + this._nsLoginInfo = new Components.Constructor( + "@mozilla.org/login-manager/loginInfo;1", + Ci.nsILoginInfo, + "init" + ); +} +PasswordStore.prototype = { + _newPropertyBag() { + return Cc["@mozilla.org/hash-property-bag;1"].createInstance( + Ci.nsIWritablePropertyBag2 + ); + }, + + // Returns an stringified object of any fields not "known" by this client + // mainly used to to prevent data loss for other clients by roundtripping + // these fields without processing them + _processUnknownFields(record) { + let unknownFields = {}; + let keys = Object.keys(record); + keys + .filter(key => !VALID_LOGIN_FIELDS.includes(key)) + .forEach(key => { + unknownFields[key] = record[key]; + }); + // If we found some unknown fields, we stringify it to be able + // to properly encrypt it for roundtripping since we can't know if + // it contained sensitive fields or not + if (Object.keys(unknownFields).length) { + return JSON.stringify(unknownFields); + } + return null; + }, + + /** + * Return an instance of nsILoginInfo (and, implicitly, nsILoginMetaInfo). + */ + _nsLoginInfoFromRecord(record) { + function nullUndefined(x) { + return x == undefined ? null : x; + } + + function stringifyNullUndefined(x) { + return x == undefined || x == null ? "" : x; + } + + if (record.formSubmitURL && record.httpRealm) { + this._log.warn( + "Record " + + record.id + + " has both formSubmitURL and httpRealm. Skipping." + ); + return null; + } + + // Passing in "undefined" results in an empty string, which later + // counts as a value. Explicitly `|| null` these fields according to JS + // truthiness. Records with empty strings or null will be unmolested. + let info = new this._nsLoginInfo( + record.hostname, + nullUndefined(record.formSubmitURL), + nullUndefined(record.httpRealm), + stringifyNullUndefined(record.username), + record.password, + record.usernameField, + record.passwordField + ); + + info.QueryInterface(Ci.nsILoginMetaInfo); + info.guid = record.id; + if (record.timeCreated && !isNaN(new Date(record.timeCreated).getTime())) { + info.timeCreated = record.timeCreated; + } + if ( + record.timePasswordChanged && + !isNaN(new Date(record.timePasswordChanged).getTime()) + ) { + info.timePasswordChanged = record.timePasswordChanged; + } + + // Check the record if there are any unknown fields from other clients + // that we want to roundtrip during sync to prevent data loss + let unknownFields = this._processUnknownFields(record.cleartext); + if (unknownFields) { + info.unknownFields = unknownFields; + } + return info; + }, + + async _getLoginFromGUID(id) { + let prop = this._newPropertyBag(); + prop.setPropertyAsAUTF8String("guid", id); + + let logins = Services.logins.searchLogins(prop); + await Async.promiseYield(); // Yield back to main thread after synchronous operation. + + if (logins.length) { + this._log.trace(logins.length + " items matching " + id + " found."); + return logins[0]; + } + + this._log.trace("No items matching " + id + " found. Ignoring"); + return null; + }, + + async getAllIDs() { + let items = {}; + let logins = Services.logins.getAllLogins(); + + for (let i = 0; i < logins.length; i++) { + // Skip over Weave password/passphrase entries. + let metaInfo = logins[i].QueryInterface(Ci.nsILoginMetaInfo); + if (Utils.getSyncCredentialsHosts().has(metaInfo.origin)) { + continue; + } + + items[metaInfo.guid] = metaInfo; + } + + return items; + }, + + async changeItemID(oldID, newID) { + this._log.trace("Changing item ID: " + oldID + " to " + newID); + + let oldLogin = await this._getLoginFromGUID(oldID); + if (!oldLogin) { + this._log.trace("Can't change item ID: item doesn't exist"); + return; + } + if (await this._getLoginFromGUID(newID)) { + this._log.trace("Can't change item ID: new ID already in use"); + return; + } + + let prop = this._newPropertyBag(); + prop.setPropertyAsAUTF8String("guid", newID); + + Services.logins.modifyLogin(oldLogin, prop); + }, + + async itemExists(id) { + return !!(await this._getLoginFromGUID(id)); + }, + + async createRecord(id, collection) { + let record = new LoginRec(collection, id); + let login = await this._getLoginFromGUID(id); + + if (!login) { + record.deleted = true; + return record; + } + + record.hostname = login.origin; + record.formSubmitURL = login.formActionOrigin; + record.httpRealm = login.httpRealm; + record.username = login.username; + record.password = login.password; + record.usernameField = login.usernameField; + record.passwordField = login.passwordField; + + // Optional fields. + login.QueryInterface(Ci.nsILoginMetaInfo); + record.timeCreated = login.timeCreated; + record.timePasswordChanged = login.timePasswordChanged; + + // put the unknown fields back to the top-level record + // during upload + if (login.unknownFields) { + let unknownFields = JSON.parse(login.unknownFields); + if (unknownFields) { + Object.keys(unknownFields).forEach(key => { + // We have to manually add it to the cleartext since that's + // what gets processed during upload + record.cleartext[key] = unknownFields[key]; + }); + } + } + + return record; + }, + + async create(record) { + let login = this._nsLoginInfoFromRecord(record); + if (!login) { + return; + } + + this._log.trace("Adding login for " + record.hostname); + this._log.trace( + "httpRealm: " + + JSON.stringify(login.httpRealm) + + "; " + + "formSubmitURL: " + + JSON.stringify(login.formActionOrigin) + ); + await Services.logins.addLoginAsync(login); + }, + + async remove(record) { + this._log.trace("Removing login " + record.id); + + let loginItem = await this._getLoginFromGUID(record.id); + if (!loginItem) { + this._log.trace("Asked to remove record that doesn't exist, ignoring"); + return; + } + + Services.logins.removeLogin(loginItem); + }, + + async update(record) { + let loginItem = await this._getLoginFromGUID(record.id); + if (!loginItem) { + this._log.trace("Skipping update for unknown item: " + record.hostname); + return; + } + + this._log.trace("Updating " + record.hostname); + let newinfo = this._nsLoginInfoFromRecord(record); + if (!newinfo) { + return; + } + + Services.logins.modifyLogin(loginItem, newinfo); + }, + + async wipe() { + Services.logins.removeAllUserFacingLogins(); + }, +}; +Object.setPrototypeOf(PasswordStore.prototype, Store.prototype); + +function PasswordTracker(name, engine) { + LegacyTracker.call(this, name, engine); +} +PasswordTracker.prototype = { + onStart() { + Svc.Obs.add("passwordmgr-storage-changed", this.asyncObserver); + }, + + onStop() { + Svc.Obs.remove("passwordmgr-storage-changed", this.asyncObserver); + }, + + async observe(subject, topic, data) { + if (this.ignoreAll) { + return; + } + + // A single add, remove or change or removing all items + // will trigger a sync for MULTI_DEVICE. + switch (data) { + case "modifyLogin": { + subject.QueryInterface(Ci.nsIArrayExtensions); + let oldLogin = subject.GetElementAt(0); + let newLogin = subject.GetElementAt(1); + if (!isSyncableChange(oldLogin, newLogin)) { + this._log.trace(`${data}: Ignoring change for ${newLogin.guid}`); + break; + } + const tracked = await this._trackLogin(newLogin); + if (tracked) { + this._log.trace(`${data}: Tracking change for ${newLogin.guid}`); + } + break; + } + + case "addLogin": + case "removeLogin": + subject + .QueryInterface(Ci.nsILoginMetaInfo) + .QueryInterface(Ci.nsILoginInfo); + const tracked = await this._trackLogin(subject); + if (tracked) { + this._log.trace(data + ": " + subject.guid); + } + break; + + // Bug 1613620: We iterate through the removed logins and track them to ensure + // the logins are deleted across synced devices/accounts + case "removeAllLogins": + subject.QueryInterface(Ci.nsIArrayExtensions); + let count = subject.Count(); + for (let i = 0; i < count; i++) { + let currentSubject = subject.GetElementAt(i); + let tracked = await this._trackLogin(currentSubject); + if (tracked) { + this._log.trace(data + ": " + currentSubject.guid); + } + } + this.score += SCORE_INCREMENT_XLARGE; + break; + } + }, + + async _trackLogin(login) { + if (Utils.getSyncCredentialsHosts().has(login.origin)) { + // Skip over Weave password/passphrase changes. + return false; + } + const added = await this.addChangedID(login.guid); + if (!added) { + return false; + } + this.score += SCORE_INCREMENT_XLARGE; + return true; + }, +}; +Object.setPrototypeOf(PasswordTracker.prototype, LegacyTracker.prototype); + +export class PasswordValidator extends CollectionValidator { + constructor() { + super("passwords", "id", [ + "hostname", + "formSubmitURL", + "httpRealm", + "password", + "passwordField", + "username", + "usernameField", + ]); + } + + getClientItems() { + let logins = Services.logins.getAllLogins(); + let syncHosts = Utils.getSyncCredentialsHosts(); + let result = logins + .map(l => l.QueryInterface(Ci.nsILoginMetaInfo)) + .filter(l => !syncHosts.has(l.origin)); + return Promise.resolve(result); + } + + normalizeClientItem(item) { + return { + id: item.guid, + guid: item.guid, + hostname: item.hostname, + formSubmitURL: item.formSubmitURL, + httpRealm: item.httpRealm, + password: item.password, + passwordField: item.passwordField, + username: item.username, + usernameField: item.usernameField, + original: item, + }; + } + + async normalizeServerItem(item) { + return Object.assign({ guid: item.id }, item); + } +} diff --git a/services/sync/modules/engines/prefs.sys.mjs b/services/sync/modules/engines/prefs.sys.mjs new file mode 100644 index 0000000000..13d8a63070 --- /dev/null +++ b/services/sync/modules/engines/prefs.sys.mjs @@ -0,0 +1,467 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Prefs which start with this prefix are our "control" prefs - they indicate +// which preferences should be synced. +const PREF_SYNC_PREFS_PREFIX = "services.sync.prefs.sync."; + +// Prefs which have a default value are usually not synced - however, if the +// preference exists under this prefix and the value is: +// * `true`, then we do sync default values. +// * `false`, then as soon as we ever sync a non-default value out, or sync +// any value in, then we toggle the value to `true`. +// +// We never explicitly set this pref back to false, so it's one-shot. +// Some preferences which are known to have a different default value on +// different platforms have this preference with a default value of `false`, +// so they don't sync until one device changes to the non-default value, then +// that value forever syncs, even if it gets reset back to the default. +// Note that preferences handled this way *must also* have the "normal" +// control pref set. +// A possible future enhancement would be to sync these prefs so that +// other distributions can flag them if they change the default, but that +// doesn't seem worthwhile until we can be confident they'd actually create +// this special control pref at the same time they flip the default. +const PREF_SYNC_SEEN_PREFIX = "services.sync.prefs.sync-seen."; + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; +import { Preferences } from "resource://gre/modules/Preferences.sys.mjs"; + +import { + Store, + SyncEngine, + Tracker, +} from "resource://services-sync/engines.sys.mjs"; +import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; +import { SCORE_INCREMENT_XLARGE } from "resource://services-sync/constants.sys.mjs"; +import { CommonUtils } from "resource://services-common/utils.sys.mjs"; + +const lazy = {}; + +XPCOMUtils.defineLazyGetter(lazy, "PREFS_GUID", () => + CommonUtils.encodeBase64URL(Services.appinfo.ID) +); + +ChromeUtils.defineESModuleGetters(lazy, { + AddonManager: "resource://gre/modules/AddonManager.sys.mjs", +}); + +// In bug 1538015, we decided that it isn't always safe to allow all "incoming" +// preferences to be applied locally. So we have introduced another preference, +// which if false (the default) will ignore all incoming preferences which don't +// already have the "control" preference locally set. If this new +// preference is set to true, then we continue our old behavior of allowing all +// preferences to be updated, even those which don't already have a local +// "control" pref. +const PREF_SYNC_PREFS_ARBITRARY = + "services.sync.prefs.dangerously_allow_arbitrary"; + +XPCOMUtils.defineLazyPreferenceGetter( + lazy, + "ALLOW_ARBITRARY", + PREF_SYNC_PREFS_ARBITRARY +); + +// The SUMO supplied URL we log with more information about how custom prefs can +// continue to be synced. SUMO have told us that this URL will remain "stable". +const PREFS_DOC_URL_TEMPLATE = + "https://support.mozilla.org/1/firefox/%VERSION%/%OS%/%LOCALE%/sync-custom-preferences"; +XPCOMUtils.defineLazyGetter(lazy, "PREFS_DOC_URL", () => + Services.urlFormatter.formatURL(PREFS_DOC_URL_TEMPLATE) +); + +// Check for a local control pref or PREF_SYNC_PREFS_ARBITRARY +function isAllowedPrefName(prefName) { + if (prefName == PREF_SYNC_PREFS_ARBITRARY) { + return false; // never allow this. + } + if (lazy.ALLOW_ARBITRARY) { + // user has set the "dangerous" pref, so everything is allowed. + return true; + } + // The pref must already have a control pref set, although it doesn't matter + // here whether that value is true or false. We can't use prefHasUserValue + // here because we also want to check prefs still with default values. + try { + Services.prefs.getBoolPref(PREF_SYNC_PREFS_PREFIX + prefName); + // pref exists! + return true; + } catch (_) { + return false; + } +} + +export function PrefRec(collection, id) { + CryptoWrapper.call(this, collection, id); +} + +PrefRec.prototype = { + _logName: "Sync.Record.Pref", +}; +Object.setPrototypeOf(PrefRec.prototype, CryptoWrapper.prototype); + +Utils.deferGetSet(PrefRec, "cleartext", ["value"]); + +export function PrefsEngine(service) { + SyncEngine.call(this, "Prefs", service); +} + +PrefsEngine.prototype = { + _storeObj: PrefStore, + _trackerObj: PrefTracker, + _recordObj: PrefRec, + version: 2, + + syncPriority: 1, + allowSkippedRecord: false, + + async getChangedIDs() { + // No need for a proper timestamp (no conflict resolution needed). + let changedIDs = {}; + if (this._tracker.modified) { + changedIDs[lazy.PREFS_GUID] = 0; + } + return changedIDs; + }, + + async _wipeClient() { + await SyncEngine.prototype._wipeClient.call(this); + this.justWiped = true; + }, + + async _reconcile(item) { + // Apply the incoming item if we don't care about the local data + if (this.justWiped) { + this.justWiped = false; + return true; + } + return SyncEngine.prototype._reconcile.call(this, item); + }, + + async trackRemainingChanges() { + if (this._modified.count() > 0) { + this._tracker.modified = true; + } + }, +}; +Object.setPrototypeOf(PrefsEngine.prototype, SyncEngine.prototype); + +// We don't use services.sync.engine.tabs.filteredSchemes since it includes +// about: pages and the like, which we want to be syncable in preferences. +// Blob, moz-extension, data and file uris are never safe to sync, +// so we limit our check to those. +const UNSYNCABLE_URL_REGEXP = /^(moz-extension|blob|data|file):/i; +function isUnsyncableURLPref(prefName) { + if (Services.prefs.getPrefType(prefName) != Ci.nsIPrefBranch.PREF_STRING) { + return false; + } + const prefValue = Services.prefs.getStringPref(prefName, ""); + return UNSYNCABLE_URL_REGEXP.test(prefValue); +} + +function PrefStore(name, engine) { + Store.call(this, name, engine); + Svc.Obs.add( + "profile-before-change", + function () { + this.__prefs = null; + }, + this + ); +} +PrefStore.prototype = { + __prefs: null, + get _prefs() { + if (!this.__prefs) { + this.__prefs = new Preferences(); + } + return this.__prefs; + }, + + _getSyncPrefs() { + let syncPrefs = Services.prefs + .getBranch(PREF_SYNC_PREFS_PREFIX) + .getChildList("") + .filter(pref => isAllowedPrefName(pref) && !isUnsyncableURLPref(pref)); + // Also sync preferences that determine which prefs get synced. + let controlPrefs = syncPrefs.map(pref => PREF_SYNC_PREFS_PREFIX + pref); + return controlPrefs.concat(syncPrefs); + }, + + _isSynced(pref) { + if (pref.startsWith(PREF_SYNC_PREFS_PREFIX)) { + // this is an incoming control pref, which is ignored if there's not already + // a local control pref for the preference. + let controlledPref = pref.slice(PREF_SYNC_PREFS_PREFIX.length); + return isAllowedPrefName(controlledPref); + } + + // This is the pref itself - it must be both allowed, and have a control + // pref which is true. + if (!this._prefs.get(PREF_SYNC_PREFS_PREFIX + pref, false)) { + return false; + } + return isAllowedPrefName(pref); + }, + + _getAllPrefs() { + let values = {}; + for (let pref of this._getSyncPrefs()) { + // Note: _isSynced doesn't call isUnsyncableURLPref since it would cause + // us not to apply (syncable) changes to preferences that are set locally + // which have unsyncable urls. + if (this._isSynced(pref) && !isUnsyncableURLPref(pref)) { + let isSet = this._prefs.isSet(pref); + // Missing and default prefs get the null value, unless that `seen` + // pref is set, in which case it always gets the value. + let forceValue = this._prefs.get(PREF_SYNC_SEEN_PREFIX + pref, false); + values[pref] = isSet || forceValue ? this._prefs.get(pref, null) : null; + // If this is a special "sync-seen" pref, and it's not the default value, + // set the seen pref to true. + if ( + isSet && + this._prefs.get(PREF_SYNC_SEEN_PREFIX + pref, false) === false + ) { + this._log.trace(`toggling sync-seen pref for '${pref}' to true`); + this._prefs.set(PREF_SYNC_SEEN_PREFIX + pref, true); + } + } + } + return values; + }, + + _setAllPrefs(values) { + const selectedThemeIDPref = "extensions.activeThemeID"; + let selectedThemeIDBefore = this._prefs.get(selectedThemeIDPref, null); + let selectedThemeIDAfter = selectedThemeIDBefore; + + // Update 'services.sync.prefs.sync.foo.pref' before 'foo.pref', otherwise + // _isSynced returns false when 'foo.pref' doesn't exist (e.g., on a new device). + let prefs = Object.keys(values).sort( + a => -a.indexOf(PREF_SYNC_PREFS_PREFIX) + ); + for (let pref of prefs) { + let value = values[pref]; + if (!this._isSynced(pref)) { + // An extra complication just so we can warn when we decline to sync a + // preference due to no local control pref. + if (!pref.startsWith(PREF_SYNC_PREFS_PREFIX)) { + // this is an incoming pref - if the incoming value is not null and + // there's no local control pref, then it means we would have previously + // applied a value, but now will decline to. + // We need to check this here rather than in _isSynced because the + // default list of prefs we sync has changed, so we don't want to report + // this message when we wouldn't have actually applied a value. + // We should probably remove all of this in ~ Firefox 80. + if (value !== null) { + // null means "use the default value" + let controlPref = PREF_SYNC_PREFS_PREFIX + pref; + let controlPrefExists; + try { + Services.prefs.getBoolPref(controlPref); + controlPrefExists = true; + } catch (ex) { + controlPrefExists = false; + } + if (!controlPrefExists) { + // This is a long message and written to both the sync log and the + // console, but note that users who have not customized the control + // prefs will never see this. + let msg = + `Not syncing the preference '${pref}' because it has no local ` + + `control preference (${PREF_SYNC_PREFS_PREFIX}${pref}) and ` + + `the preference ${PREF_SYNC_PREFS_ARBITRARY} isn't true. ` + + `See ${lazy.PREFS_DOC_URL} for more information`; + console.warn(msg); + this._log.warn(msg); + } + } + } + continue; + } + + if (typeof value == "string" && UNSYNCABLE_URL_REGEXP.test(value)) { + this._log.trace(`Skipping incoming unsyncable url for pref: ${pref}`); + continue; + } + + switch (pref) { + // Some special prefs we don't want to set directly. + case selectedThemeIDPref: + selectedThemeIDAfter = value; + break; + + // default is to just set the pref + default: + if (value == null) { + // Pref has gone missing. The best we can do is reset it. + this._prefs.reset(pref); + } else { + try { + this._prefs.set(pref, value); + } catch (ex) { + this._log.trace(`Failed to set pref: ${pref}`, ex); + } + } + // If there's a "sync-seen" pref for this it gets toggled to true + // regardless of the value. + let seenPref = PREF_SYNC_SEEN_PREFIX + pref; + if (this._prefs.get(seenPref, undefined) === false) { + this._prefs.set(PREF_SYNC_SEEN_PREFIX + pref, true); + } + } + } + // Themes are a little messy. Themes which have been installed are handled + // by the addons engine - but default themes aren't seen by that engine. + // So if there's a new default theme ID and that ID corresponds to a + // system addon, then we arrange to enable that addon here. + if (selectedThemeIDBefore != selectedThemeIDAfter) { + this._maybeEnableBuiltinTheme(selectedThemeIDAfter).catch(e => { + this._log.error("Failed to maybe update the default theme", e); + }); + } + }, + + async _maybeEnableBuiltinTheme(themeId) { + let addon = null; + try { + addon = await lazy.AddonManager.getAddonByID(themeId); + } catch (ex) { + this._log.trace( + `There's no addon with ID '${themeId} - it can't be a builtin theme` + ); + return; + } + if (addon && addon.isBuiltin && addon.type == "theme") { + this._log.trace(`Enabling builtin theme '${themeId}'`); + await addon.enable(); + } else { + this._log.trace( + `Have incoming theme ID of '${themeId}' but it's not a builtin theme` + ); + } + }, + + async getAllIDs() { + /* We store all prefs in just one WBO, with just one GUID */ + let allprefs = {}; + allprefs[lazy.PREFS_GUID] = true; + return allprefs; + }, + + async changeItemID(oldID, newID) { + this._log.trace("PrefStore GUID is constant!"); + }, + + async itemExists(id) { + return id === lazy.PREFS_GUID; + }, + + async createRecord(id, collection) { + let record = new PrefRec(collection, id); + + if (id == lazy.PREFS_GUID) { + record.value = this._getAllPrefs(); + } else { + record.deleted = true; + } + + return record; + }, + + async create(record) { + this._log.trace("Ignoring create request"); + }, + + async remove(record) { + this._log.trace("Ignoring remove request"); + }, + + async update(record) { + // Silently ignore pref updates that are for other apps. + if (record.id != lazy.PREFS_GUID) { + return; + } + + this._log.trace("Received pref updates, applying..."); + this._setAllPrefs(record.value); + }, + + async wipe() { + this._log.trace("Ignoring wipe request"); + }, +}; +Object.setPrototypeOf(PrefStore.prototype, Store.prototype); + +function PrefTracker(name, engine) { + Tracker.call(this, name, engine); + this._ignoreAll = false; + Svc.Obs.add("profile-before-change", this.asyncObserver); +} +PrefTracker.prototype = { + get ignoreAll() { + return this._ignoreAll; + }, + + set ignoreAll(value) { + this._ignoreAll = value; + }, + + get modified() { + return Svc.Prefs.get("engine.prefs.modified", false); + }, + set modified(value) { + Svc.Prefs.set("engine.prefs.modified", value); + }, + + clearChangedIDs: function clearChangedIDs() { + this.modified = false; + }, + + __prefs: null, + get _prefs() { + if (!this.__prefs) { + this.__prefs = new Preferences(); + } + return this.__prefs; + }, + + onStart() { + Services.prefs.addObserver("", this.asyncObserver); + }, + + onStop() { + this.__prefs = null; + Services.prefs.removeObserver("", this.asyncObserver); + }, + + async observe(subject, topic, data) { + switch (topic) { + case "profile-before-change": + await this.stop(); + break; + case "nsPref:changed": + if (this.ignoreAll) { + break; + } + // Trigger a sync for MULTI-DEVICE for a change that determines + // which prefs are synced or a regular pref change. + if ( + data.indexOf(PREF_SYNC_PREFS_PREFIX) == 0 || + this._prefs.get(PREF_SYNC_PREFS_PREFIX + data, false) + ) { + this.score += SCORE_INCREMENT_XLARGE; + this.modified = true; + this._log.trace("Preference " + data + " changed"); + } + break; + } + }, +}; +Object.setPrototypeOf(PrefTracker.prototype, Tracker.prototype); + +export function getPrefsGUIDForTest() { + return lazy.PREFS_GUID; +} diff --git a/services/sync/modules/engines/tabs.sys.mjs b/services/sync/modules/engines/tabs.sys.mjs new file mode 100644 index 0000000000..45242b71a7 --- /dev/null +++ b/services/sync/modules/engines/tabs.sys.mjs @@ -0,0 +1,624 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +const STORAGE_VERSION = 1; // This needs to be kept in-sync with the rust storage version + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; +import { SyncEngine, Tracker } from "resource://services-sync/engines.sys.mjs"; +import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; +import { Log } from "resource://gre/modules/Log.sys.mjs"; +import { + SCORE_INCREMENT_SMALL, + STATUS_OK, + URI_LENGTH_MAX, +} from "resource://services-sync/constants.sys.mjs"; +import { CommonUtils } from "resource://services-common/utils.sys.mjs"; +import { Async } from "resource://services-common/async.sys.mjs"; +import { + SyncRecord, + SyncTelemetry, +} from "resource://services-sync/telemetry.sys.mjs"; +import { BridgedEngine } from "resource://services-sync/bridged_engine.sys.mjs"; + +const FAR_FUTURE = 4102405200000; // 2100/01/01 + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", + PrivateBrowsingUtils: "resource://gre/modules/PrivateBrowsingUtils.sys.mjs", + ReaderMode: "resource://gre/modules/ReaderMode.sys.mjs", + TabsStore: "resource://gre/modules/RustTabs.sys.mjs", +}); + +XPCOMUtils.defineLazyPreferenceGetter( + lazy, + "TABS_FILTERED_SCHEMES", + "services.sync.engine.tabs.filteredSchemes", + "", + null, + val => { + return new Set(val.split("|")); + } +); + +XPCOMUtils.defineLazyPreferenceGetter( + lazy, + "SYNC_AFTER_DELAY_MS", + "services.sync.syncedTabs.syncDelayAfterTabChange", + 0 +); + +// A "bridged engine" to our tabs component. +export function TabEngine(service) { + BridgedEngine.call(this, "Tabs", service); +} + +TabEngine.prototype = { + _trackerObj: TabTracker, + syncPriority: 3, + + async prepareTheBridge(isQuickWrite) { + let clientsEngine = this.service.clientsEngine; + // Tell the bridged engine about clients. + // This is the same shape as ClientData in app-services. + // schema: https://github.com/mozilla/application-services/blob/a1168751231ed4e88c44d85f6dccc09c3b412bd2/components/sync15/src/client_types.rs#L14 + let clientData = { + local_client_id: clientsEngine.localID, + recent_clients: {}, + }; + + // We shouldn't upload tabs past what the server will accept + let tabs = await this.getTabsWithinPayloadSize(); + await this._rustStore.setLocalTabs( + tabs.map(tab => { + // rust wants lastUsed in MS but the provider gives it in seconds + tab.lastUsed = tab.lastUsed * 1000; + return tab; + }) + ); + + for (let remoteClient of clientsEngine.remoteClients) { + let id = remoteClient.id; + if (!id) { + throw new Error("Remote client somehow did not have an id"); + } + let client = { + fxa_device_id: remoteClient.fxaDeviceId, + // device_name and device_type are soft-deprecated - every client + // prefers what's in the FxA record. But fill them correctly anyway. + device_name: clientsEngine.getClientName(id) ?? "", + device_type: clientsEngine.getClientType(id), + }; + clientData.recent_clients[id] = client; + } + + // put ourself in there too so we record the correct device info in our sync record. + clientData.recent_clients[clientsEngine.localID] = { + fxa_device_id: await clientsEngine.fxAccounts.device.getLocalId(), + device_name: clientsEngine.localName, + device_type: clientsEngine.localType, + }; + + // Quick write needs to adjust the lastSync so we can POST to the server + // see quickWrite() for details + if (isQuickWrite) { + await this.setLastSync(FAR_FUTURE); + await this._bridge.prepareForSync(JSON.stringify(clientData)); + return; + } + + // Just incase we crashed while the lastSync timestamp was FAR_FUTURE, we + // reset it to zero + if ((await this.getLastSync()) === FAR_FUTURE) { + await this._bridge.setLastSync(0); + } + await this._bridge.prepareForSync(JSON.stringify(clientData)); + }, + + async _syncStartup() { + await super._syncStartup(); + await this.prepareTheBridge(); + }, + + async initialize() { + await SyncEngine.prototype.initialize.call(this); + + let path = PathUtils.join(PathUtils.profileDir, "synced-tabs.db"); + this._rustStore = await lazy.TabsStore.init(path); + this._bridge = await this._rustStore.bridgedEngine(); + + // Uniffi doesn't currently only support async methods, so we'll need to hardcode + // these values for now (which is fine for now as these hardly ever change) + this._bridge.storageVersion = STORAGE_VERSION; + this._bridge.allowSkippedRecord = true; + + this._log.info("Got a bridged engine!"); + this._tracker.modified = true; + }, + + async getChangedIDs() { + // No need for a proper timestamp (no conflict resolution needed). + let changedIDs = {}; + if (this._tracker.modified) { + changedIDs[this.service.clientsEngine.localID] = 0; + } + return changedIDs; + }, + + // API for use by Sync UI code to give user choices of tabs to open. + async getAllClients() { + let remoteTabs = await this._rustStore.getAll(); + let remoteClientTabs = []; + for (let remoteClient of this.service.clientsEngine.remoteClients) { + // We get the some client info from the rust tabs engine and some from + // the clients engine. + let rustClient = remoteTabs.find( + x => x.clientId === remoteClient.fxaDeviceId + ); + if (!rustClient) { + continue; + } + let client = { + // rust gives us ms but js uses seconds, so fix them up. + tabs: rustClient.remoteTabs.map(tab => { + tab.lastUsed = tab.lastUsed / 1000; + return tab; + }), + lastModified: rustClient.lastModified / 1000, + ...remoteClient, + }; + remoteClientTabs.push(client); + } + return remoteClientTabs; + }, + + async removeClientData() { + let url = this.engineURL + "/" + this.service.clientsEngine.localID; + await this.service.resource(url).delete(); + }, + + async trackRemainingChanges() { + if (this._modified.count() > 0) { + this._tracker.modified = true; + } + }, + + async getTabsWithinPayloadSize() { + const maxPayloadSize = this.service.getMaxRecordPayloadSize(); + // See bug 535326 comment 8 for an explanation of the estimation + const maxSerializedSize = (maxPayloadSize / 4) * 3 - 1500; + return TabProvider.getAllTabsWithEstimatedMax(true, maxSerializedSize); + }, + + // Support for "quick writes" + _engineLock: Utils.lock, + _engineLocked: false, + + // Tabs has a special lock to help support its "quick write" + get locked() { + return this._engineLocked; + }, + lock() { + if (this._engineLocked) { + return false; + } + this._engineLocked = true; + return true; + }, + unlock() { + this._engineLocked = false; + }, + + // Quickly do a POST of our current tabs if possible. + // This does things that would be dangerous for other engines - eg, posting + // without checking what's on the server could cause data-loss for other + // engines, but because each device exclusively owns exactly 1 tabs record + // with a known ID, it's safe here. + // Returns true if we successfully synced, false otherwise (either on error + // or because we declined to sync for any reason.) The return value is + // primarily for tests. + async quickWrite() { + if (!this.enabled) { + // this should be very rare, and only if tabs are disabled after the + // timer is created. + this._log.info("Can't do a quick-sync as tabs is disabled"); + return false; + } + // This quick-sync doesn't drive the login state correctly, so just + // decline to sync if out status is bad + if (this.service.status.checkSetup() != STATUS_OK) { + this._log.info( + "Can't do a quick-sync due to the service status", + this.service.status.toString() + ); + return false; + } + if (!this.service.serverConfiguration) { + this._log.info("Can't do a quick sync before the first full sync"); + return false; + } + try { + return await this._engineLock("tabs.js: quickWrite", async () => { + // We want to restore the lastSync timestamp when complete so next sync + // takes tabs written by other devices since our last real sync. + // And for this POST we don't want the protections offered by + // X-If-Unmodified-Since - we want the POST to work even if the remote + // has moved on and we will catch back up next full sync. + const origLastSync = await this.getLastSync(); + try { + return this._doQuickWrite(); + } finally { + // set the lastSync to it's original value for regular sync + await this.setLastSync(origLastSync); + } + })(); + } catch (ex) { + if (!Utils.isLockException(ex)) { + throw ex; + } + this._log.info( + "Can't do a quick-write as another tab sync is in progress" + ); + return false; + } + }, + + // The guts of the quick-write sync, after we've taken the lock, checked + // the service status etc. + async _doQuickWrite() { + // We need to track telemetry for these syncs too! + const name = "tabs"; + let telemetryRecord = new SyncRecord( + SyncTelemetry.allowedEngines, + "quick-write" + ); + telemetryRecord.onEngineStart(name); + try { + Async.checkAppReady(); + // We need to prep the bridge before we try to POST since it grabs + // the most recent local client id and properly sets a lastSync + // which is needed for a proper POST request + await this.prepareTheBridge(true); + this._tracker.clearChangedIDs(); + this._tracker.resetScore(); + + Async.checkAppReady(); + // now just the "upload" part of a sync, + // which for a rust engine is not obvious. + // We need to do is ask the rust engine for the changes. Although + // this is kinda abusing the bridged-engine interface, we know the tabs + // implementation of it works ok + let outgoing = await this._bridge.apply(); + // We know we always have exactly 1 record. + let mine = outgoing[0]; + this._log.trace("outgoing bso", mine); + // `this._recordObj` is a `BridgedRecord`, which isn't exported. + let record = this._recordObj.fromOutgoingBso(this.name, JSON.parse(mine)); + let changeset = {}; + changeset[record.id] = { synced: false, record }; + this._modified.replace(changeset); + + Async.checkAppReady(); + await this._uploadOutgoing(); + telemetryRecord.onEngineStop(name, null); + return true; + } catch (ex) { + this._log.warn("quicksync sync failed", ex); + telemetryRecord.onEngineStop(name, ex); + return false; + } finally { + // The top-level sync is never considered to fail here, just the engine + telemetryRecord.finished(null); + SyncTelemetry.takeTelemetryRecord(telemetryRecord); + } + }, + + async _sync() { + try { + await this._engineLock("tabs.js: fullSync", async () => { + await super._sync(); + })(); + } catch (ex) { + if (!Utils.isLockException(ex)) { + throw ex; + } + this._log.info( + "Can't do full tabs sync as a quick-write is currently running" + ); + } + }, +}; +Object.setPrototypeOf(TabEngine.prototype, BridgedEngine.prototype); + +export const TabProvider = { + getWindowEnumerator() { + return Services.wm.getEnumerator("navigator:browser"); + }, + + shouldSkipWindow(win) { + return win.closed || lazy.PrivateBrowsingUtils.isWindowPrivate(win); + }, + + getAllBrowserTabs() { + let tabs = []; + for (let win of this.getWindowEnumerator()) { + if (this.shouldSkipWindow(win)) { + continue; + } + // Get all the tabs from the browser + for (let tab of win.gBrowser.tabs) { + tabs.push(tab); + } + } + + return tabs.sort(function (a, b) { + return b.lastAccessed - a.lastAccessed; + }); + }, + + // This function creates tabs records up to a specified amount of bytes + // It is an "estimation" since we don't accurately calculate how much the + // favicon and JSON overhead is and give a rough estimate (for optimization purposes) + async getAllTabsWithEstimatedMax(filter, bytesMax) { + let log = Log.repository.getLogger(`Sync.Engine.Tabs.Provider`); + let tabRecords = []; + let iconPromises = []; + let runningByteLength = 0; + let encoder = new TextEncoder(); + + // Fetch all the tabs the user has open + let winTabs = this.getAllBrowserTabs(); + + for (let tab of winTabs) { + // We don't want to process any more tabs than we can sync + if (runningByteLength >= bytesMax) { + log.warn( + `Can't fit all tabs in sync payload: have ${winTabs.length}, + but can only fit ${tabRecords.length}.` + ); + break; + } + + // Note that we used to sync "tab history" (ie, the "back button") state, + // but in practice this hasn't been used - only the current URI is of + // interest to clients. + // We stopped recording this in bug 1783991. + if (!tab?.linkedBrowser) { + continue; + } + let acceptable = !filter + ? url => url + : url => + url && + !lazy.TABS_FILTERED_SCHEMES.has(Services.io.extractScheme(url)); + + let url = tab.linkedBrowser.currentURI?.spec; + // Special case for reader mode. + if (url && url.startsWith("about:reader?")) { + url = lazy.ReaderMode.getOriginalUrl(url); + } + // We ignore the tab completely if the current entry url is + // not acceptable (we need something accurate to open). + if (!acceptable(url)) { + continue; + } + + if (url.length > URI_LENGTH_MAX) { + log.trace("Skipping over-long URL."); + continue; + } + + let thisTab = { + title: tab.linkedBrowser.contentTitle || "", + urlHistory: [url], + icon: "", + lastUsed: Math.floor((tab.lastAccessed || 0) / 1000), + }; + tabRecords.push(thisTab); + + // we don't want to wait for each favicon to resolve to get the bytes + // so we estimate a conservative 100 chars for the favicon and json overhead + // Rust will further optimize and trim if we happened to be wildly off + runningByteLength += + encoder.encode(thisTab.title + thisTab.lastUsed + url).byteLength + 100; + + // Use the favicon service for the icon url - we can wait for the promises at the end. + let iconPromise = lazy.PlacesUtils.promiseFaviconData(url) + .then(iconData => { + thisTab.icon = iconData.uri.spec; + }) + .catch(ex => { + log.trace( + `Failed to fetch favicon for ${url}`, + thisTab.urlHistory[0] + ); + }); + iconPromises.push(iconPromise); + } + + await Promise.allSettled(iconPromises); + return tabRecords; + }, +}; + +function TabTracker(name, engine) { + Tracker.call(this, name, engine); + + // Make sure "this" pointer is always set correctly for event listeners. + this.onTab = Utils.bind2(this, this.onTab); + this._unregisterListeners = Utils.bind2(this, this._unregisterListeners); +} +TabTracker.prototype = { + QueryInterface: ChromeUtils.generateQI(["nsIObserver"]), + + clearChangedIDs() { + this.modified = false; + }, + + // We do not track TabSelect because that almost always triggers + // the web progress listeners (onLocationChange), which we already track + _topics: ["TabOpen", "TabClose"], + + _registerListenersForWindow(window) { + this._log.trace("Registering tab listeners in window"); + for (let topic of this._topics) { + window.addEventListener(topic, this.onTab); + } + window.addEventListener("unload", this._unregisterListeners); + // If it's got a tab browser we can listen for things like navigation. + if (window.gBrowser) { + window.gBrowser.addProgressListener(this); + } + }, + + _unregisterListeners(event) { + this._unregisterListenersForWindow(event.target); + }, + + _unregisterListenersForWindow(window) { + this._log.trace("Removing tab listeners in window"); + window.removeEventListener("unload", this._unregisterListeners); + for (let topic of this._topics) { + window.removeEventListener(topic, this.onTab); + } + if (window.gBrowser) { + window.gBrowser.removeProgressListener(this); + } + }, + + onStart() { + Svc.Obs.add("domwindowopened", this.asyncObserver); + for (let win of Services.wm.getEnumerator("navigator:browser")) { + this._registerListenersForWindow(win); + } + }, + + onStop() { + Svc.Obs.remove("domwindowopened", this.asyncObserver); + for (let win of Services.wm.getEnumerator("navigator:browser")) { + this._unregisterListenersForWindow(win); + } + }, + + async observe(subject, topic, data) { + switch (topic) { + case "domwindowopened": + let onLoad = () => { + subject.removeEventListener("load", onLoad); + // Only register after the window is done loading to avoid unloads. + this._registerListenersForWindow(subject); + }; + + // Add tab listeners now that a window has opened. + subject.addEventListener("load", onLoad); + break; + } + }, + + onTab(event) { + if (event.originalTarget.linkedBrowser) { + let browser = event.originalTarget.linkedBrowser; + if ( + lazy.PrivateBrowsingUtils.isBrowserPrivate(browser) && + !lazy.PrivateBrowsingUtils.permanentPrivateBrowsing + ) { + this._log.trace("Ignoring tab event from private browsing."); + return; + } + } + this._log.trace("onTab event: " + event.type); + + switch (event.type) { + case "TabOpen": + /* We do not have a reliable way of checking the URI on the TabOpen + * so we will rely on the other methods (onLocationChange, getAllTabsWithEstimatedMax) + * to filter these when going through sync + */ + this.callScheduleSync(SCORE_INCREMENT_SMALL); + break; + case "TabClose": + // If event target has `linkedBrowser`, the event target can be assumed <tab> element. + // Else, event target is assumed <browser> element, use the target as it is. + const tab = event.target.linkedBrowser || event.target; + + // TabClose means the tab has already loaded and we can check the URI + // and ignore if it's a scheme we don't care about + if (lazy.TABS_FILTERED_SCHEMES.has(tab.currentURI.scheme)) { + return; + } + this.callScheduleSync(SCORE_INCREMENT_SMALL); + break; + } + }, + + // web progress listeners. + onLocationChange(webProgress, request, locationURI, flags) { + // We only care about top-level location changes. We do want location changes in the + // same document because if a page uses the `pushState()` API, they *appear* as though + // they are in the same document even if the URL changes. It also doesn't hurt to accurately + // reflect the fragment changing - so we allow LOCATION_CHANGE_SAME_DOCUMENT + if ( + flags & Ci.nsIWebProgressListener.LOCATION_CHANGE_RELOAD || + !webProgress.isTopLevel || + !locationURI + ) { + return; + } + + // We can't filter out tabs that we don't sync here, because we might be + // navigating from a tab that we *did* sync to one we do not, and that + // tab we *did* sync should no longer be synced. + this.callScheduleSync(); + }, + + callScheduleSync(scoreIncrement) { + this.modified = true; + let { scheduler } = this.engine.service; + let delayInMs = lazy.SYNC_AFTER_DELAY_MS; + + // Schedule a sync once we detect a tab change + // to ensure the server always has the most up to date tabs + if ( + delayInMs > 0 && + scheduler.numClients > 1 // Only schedule quick syncs for multi client users + ) { + if (this.tabsQuickWriteTimer) { + this._log.debug( + "Detected a tab change, but a quick-write is already scheduled" + ); + return; + } + this._log.debug( + "Detected a tab change: scheduling a quick-write in " + delayInMs + "ms" + ); + CommonUtils.namedTimer( + () => { + this._log.trace("tab quick-sync timer fired."); + this.engine + .quickWrite() + .then(() => { + this._log.trace("tab quick-sync done."); + }) + .catch(ex => { + this._log.error("tab quick-sync failed.", ex); + }); + }, + delayInMs, + this, + "tabsQuickWriteTimer" + ); + } else if (scoreIncrement) { + this._log.debug( + "Detected a tab change, but conditions aren't met for a quick write - bumping score" + ); + this.score += scoreIncrement; + } else { + this._log.debug( + "Detected a tab change, but conditions aren't met for a quick write or a score bump" + ); + } + }, +}; +Object.setPrototypeOf(TabTracker.prototype, Tracker.prototype); |