/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ /** * This file has all the machinery for hooking up bridged engines implemented * in Rust. It's the JavaScript side of the Golden Gate bridge that connects * Desktop Sync to a Rust `BridgedEngine`, via the `mozIBridgedSyncEngine` * XPCOM interface. * * Creating a bridged engine only takes a few lines of code, since most of the * hard work is done on the Rust side. On the JS side, you'll need to subclass * `BridgedEngine` (instead of `SyncEngine`), supply a `mozIBridgedSyncEngine` * for your subclass to wrap, and optionally implement and override the tracker. */ const { Changeset, SyncEngine, Tracker } = ChromeUtils.import( "resource://services-sync/engines.js" ); const { RawCryptoWrapper } = ChromeUtils.import( "resource://services-sync/record.js" ); const lazy = {}; ChromeUtils.defineESModuleGetters(lazy, { Log: "resource://gre/modules/Log.sys.mjs", PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", }); var EXPORTED_SYMBOLS = ["BridgeWrapperXPCOM", "BridgedEngine", "LogAdapter"]; /** * A stub store that converts between raw decrypted incoming records and * envelopes. Since the interface we need is so minimal, this class doesn't * inherit from the base `Store` implementation...it would take more code to * override all those behaviors! * * This class isn't meant to be subclassed, because bridged engines shouldn't * override their store classes in `_storeObj`. */ class BridgedStore { constructor(name, engine) { if (!engine) { throw new Error("Store must be associated with an Engine instance."); } this.engine = engine; this._log = lazy.Log.repository.getLogger(`Sync.Engine.${name}.Store`); this._batchChunkSize = 500; } async applyIncomingBatch(records) { for (let chunk of lazy.PlacesUtils.chunkArray( records, this._batchChunkSize )) { let incomingEnvelopesAsJSON = chunk.map(record => JSON.stringify(record.toIncomingBso()) ); this._log.trace("incoming envelopes", incomingEnvelopesAsJSON); await this.engine._bridge.storeIncoming(incomingEnvelopesAsJSON); } // Array of failed records. return []; } async wipe() { await this.engine._bridge.wipe(); } } /** * A wrapper class to convert between BSOs on the JS side, and envelopes on the * Rust side. This class intentionally subclasses `RawCryptoWrapper`, because we * don't want the stringification and parsing machinery in `CryptoWrapper`. * * This class isn't meant to be subclassed, because bridged engines shouldn't * override their record classes in `_recordObj`. */ class BridgedRecord extends RawCryptoWrapper { /** * Creates an outgoing record from a BSO returned by a bridged engine. * * @param {String} collection The collection name. * @param {Object} bso The outgoing bso (ie, a sync15::bso::OutgoingBso) returned from * `mozIBridgedSyncEngine::apply`. * @return {BridgedRecord} A Sync record ready to encrypt and upload. */ static fromOutgoingBso(collection, bso) { // The BSO has already been JSON serialized coming out of Rust, so the // envelope has been flattened. if (typeof bso.id != "string") { throw new TypeError("Outgoing BSO missing ID"); } if (typeof bso.payload != "string") { throw new TypeError("Outgoing BSO missing payload"); } let record = new BridgedRecord(collection, bso.id); record.cleartext = bso.payload; return record; } transformBeforeEncrypt(cleartext) { if (typeof cleartext != "string") { throw new TypeError("Outgoing bridged engine records must be strings"); } return cleartext; } transformAfterDecrypt(cleartext) { if (typeof cleartext != "string") { throw new TypeError("Incoming bridged engine records must be strings"); } return cleartext; } /* * Converts this incoming record into an envelope to pass to a bridged engine. * This object must be kept in sync with `sync15::IncomingBso`. * * @return {Object} The incoming envelope, to pass to * `mozIBridgedSyncEngine::storeIncoming`. */ toIncomingBso() { return { id: this.data.id, modified: this.data.modified, payload: this.cleartext, }; } } class BridgeError extends Error { constructor(code, message) { super(message); this.name = "BridgeError"; // TODO: We may want to use a different name for this, since errors with // a `result` property are treated specially by telemetry, discarding the // message...but, unlike other `nserror`s, the message is actually useful, // and we still want to capture it. this.result = code; } } class InterruptedError extends Error { constructor(message) { super(message); this.name = "InterruptedError"; } } /** * Adapts a `Log.sys.mjs` logger to a `mozIServicesLogSink`. This class is copied * from `SyncedBookmarksMirror.jsm`. */ class LogAdapter { constructor(log) { this.log = log; } get maxLevel() { let level = this.log.level; if (level <= lazy.Log.Level.All) { return Ci.mozIServicesLogSink.LEVEL_TRACE; } if (level <= lazy.Log.Level.Info) { return Ci.mozIServicesLogSink.LEVEL_DEBUG; } if (level <= lazy.Log.Level.Warn) { return Ci.mozIServicesLogSink.LEVEL_WARN; } if (level <= lazy.Log.Level.Error) { return Ci.mozIServicesLogSink.LEVEL_ERROR; } return Ci.mozIServicesLogSink.LEVEL_OFF; } trace(message) { this.log.trace(message); } debug(message) { this.log.debug(message); } warn(message) { this.log.warn(message); } error(message) { this.log.error(message); } } // This converts the XPCOM-defined, callback-based mozIBridgedSyncEngine to // a promise-based implementation. class BridgeWrapperXPCOM { constructor(component) { this.comp = component; } // A few sync, non-callback based attributes. get storageVersion() { return this.comp.storageVersion; } get allowSkippedRecord() { return this.comp.allowSkippedRecord; } get logger() { return this.comp.logger; } // And the async functions we promisify. // Note this is `lastSync` via uniffi but `getLastSync` via xpcom lastSync() { return BridgeWrapperXPCOM.#promisify(this.comp.getLastSync); } setLastSync(lastSyncMillis) { return BridgeWrapperXPCOM.#promisify(this.comp.setLastSync, lastSyncMillis); } getSyncId() { return BridgeWrapperXPCOM.#promisify(this.comp.getSyncId); } resetSyncId() { return BridgeWrapperXPCOM.#promisify(this.comp.resetSyncId); } ensureCurrentSyncId(newSyncId) { return BridgeWrapperXPCOM.#promisify( this.comp.ensureCurrentSyncId, newSyncId ); } syncStarted() { return BridgeWrapperXPCOM.#promisify(this.comp.syncStarted); } storeIncoming(incomingEnvelopesAsJSON) { return BridgeWrapperXPCOM.#promisify( this.comp.storeIncoming, incomingEnvelopesAsJSON ); } apply() { return BridgeWrapperXPCOM.#promisify(this.comp.apply); } setUploaded(newTimestampMillis, uploadedIds) { return BridgeWrapperXPCOM.#promisify( this.comp.setUploaded, newTimestampMillis, uploadedIds ); } syncFinished() { return BridgeWrapperXPCOM.#promisify(this.comp.syncFinished); } reset() { return BridgeWrapperXPCOM.#promisify(this.comp.reset); } wipe() { return BridgeWrapperXPCOM.#promisify(this.comp.wipe); } // Converts a XPCOM bridged function that takes a callback into one that returns a // promise. static #promisify(func, ...params) { return new Promise((resolve, reject) => { func(...params, { // This object implicitly implements all three callback interfaces // (`mozIBridgedSyncEngine{Apply, Result}Callback`), because they have // the same methods. The only difference is the type of the argument // passed to `handleSuccess`, which doesn't matter in JS. handleSuccess: resolve, handleError(code, message) { reject(transformError(code, message)); }, }); }); } } /** * A base class used to plug a Rust engine into Sync, and have it work like any * other engine. The constructor takes a bridge as its first argument, which is * a "bridged sync engine", as defined by UniFFI in the application-services * crate. * For backwards compatibility, this can also be an instance of an XPCOM * component class that implements `mozIBridgedSyncEngine`, wrapped in * a `BridgeWrapperXPCOM` wrapper. * (Note that at time of writing, the above is slightly aspirational; the * actual definition of the UniFFI shared bridged engine is still in flux.) * * This class inherits from `SyncEngine`, which has a lot of machinery that we * don't need, but that's fairly easy to override. It would be harder to * reimplement the machinery that we _do_ need here. However, because of that, * this class has lots of methods that do nothing, or return empty data. The * docs above each method explain what it's overriding, and why. * * This class is designed to be subclassed, but the only part that your engine * may want to override is `_trackerObj`. Even then, using the default (no-op) * tracker is fine, because the shape of the `Tracker` interface may not make * sense for all engines. */ function BridgedEngine(name, service) { SyncEngine.call(this, name, service); } BridgedEngine.prototype = { /** * The Rust implemented bridge. Must be set by the engine which subclasses us. */ _bridge: null, /** * The tracker class for this engine. Subclasses may want to override this * with their own tracker, though using the default `Tracker` is fine. */ _trackerObj: Tracker, /** Returns the record class for all bridged engines. */ get _recordObj() { return BridgedRecord; }, set _recordObj(obj) { throw new TypeError("Don't override the record class for bridged engines"); }, /** Returns the store class for all bridged engines. */ get _storeObj() { return BridgedStore; }, set _storeObj(obj) { throw new TypeError("Don't override the store class for bridged engines"); }, /** Returns the storage version for this engine. */ get version() { return this._bridge.storageVersion; }, // Legacy engines allow sync to proceed if some records are too large to // upload (eg, a payload that's bigger than the server's published limits). // If this returns true, we will just skip the record without even attempting // to upload. If this is false, we'll abort the entire batch. // If the engine allows this, it will need to detect this scenario by noticing // the ID is not in the 'success' records reported to `setUploaded`. // (Note that this is not to be confused with the fact server's can currently // reject records as part of a POST - but we hope to remove this ability from // the server API. Note also that this is not bullet-proof - if the count of // records is high, it's possible that we will have committed a previous // batch before we hit the relevant limits, so things might have been written. // We hope to fix this by ensuring batch limits are such that this is // impossible) get allowSkippedRecord() { return this._bridge.allowSkippedRecord; }, /** * Returns the sync ID for this engine. This is exposed for tests, but * Sync code always calls `resetSyncID()` and `ensureCurrentSyncID()`, * not this. * * @returns {String?} The sync ID, or `null` if one isn't set. */ async getSyncID() { // Note that all methods on an XPCOM class instance are automatically bound, // so we don't need to write `this._bridge.getSyncId.bind(this._bridge)`. let syncID = await this._bridge.getSyncId(); return syncID; }, async resetSyncID() { await this._deleteServerCollection(); let newSyncID = await this.resetLocalSyncID(); return newSyncID; }, async resetLocalSyncID() { let newSyncID = await this._bridge.resetSyncId(); return newSyncID; }, async ensureCurrentSyncID(newSyncID) { let assignedSyncID = await this._bridge.ensureCurrentSyncId(newSyncID); return assignedSyncID; }, async getLastSync() { // The bridge defines lastSync as integer ms, but sync itself wants to work // in a float seconds with 2 decimal places. let lastSyncMS = await this._bridge.lastSync(); return Math.round(lastSyncMS / 10) / 100; }, async setLastSync(lastSyncSeconds) { await this._bridge.setLastSync(Math.round(lastSyncSeconds * 1000)); }, /** * Returns the initial changeset for the sync. Bridged engines handle * reconciliation internally, so we don't know what changed until after we've * stored and applied all incoming records. So we return an empty changeset * here, and replace it with the real one in `_processIncoming`. */ async pullChanges() { return {}; }, async trackRemainingChanges() { await this._bridge.syncFinished(); }, /** * Marks a record for a hard-`DELETE` at the end of the sync. The base method * also removes it from the tracker, but we don't use the tracker for that, * so we override the method to just mark. */ _deleteId(id) { this._noteDeletedId(id); }, /** * Always stage incoming records, bypassing the base engine's reconciliation * machinery. */ async _reconcile() { return true; }, async _syncStartup() { await super._syncStartup(); await this._bridge.syncStarted(); }, async _processIncoming(newitems) { await super._processIncoming(newitems); let outgoingBsosAsJSON = await this._bridge.apply(); let changeset = {}; for (let bsoAsJSON of outgoingBsosAsJSON) { this._log.trace("outgoing bso", bsoAsJSON); let record = BridgedRecord.fromOutgoingBso( this.name, JSON.parse(bsoAsJSON) ); changeset[record.id] = { synced: false, record, }; } this._modified.replace(changeset); }, /** * Notify the bridged engine that we've successfully uploaded a batch, so * that it can update its local state. For example, if the engine uses a * mirror and a temp table for outgoing records, it can write the uploaded * records from the outgoing table back to the mirror. */ async _onRecordsWritten(succeeded, failed, serverModifiedTime) { // JS uses seconds but Rust uses milliseconds so we'll need to convert let serverModifiedMS = Math.round(serverModifiedTime * 1000); await this._bridge.setUploaded(Math.floor(serverModifiedMS), succeeded); }, async _createTombstone() { throw new Error("Bridged engines don't support weak uploads"); }, async _createRecord(id) { let change = this._modified.changes[id]; if (!change) { throw new TypeError("Can't create record for unchanged item"); } return change.record; }, async _resetClient() { await super._resetClient(); await this._bridge.reset(); }, }; Object.setPrototypeOf(BridgedEngine.prototype, SyncEngine.prototype); function transformError(code, message) { switch (code) { case Cr.NS_ERROR_ABORT: return new InterruptedError(message); default: return new BridgeError(code, message); } } class BridgedChangeset extends Changeset { // Only `_reconcile` calls `getModifiedTimestamp` and `has`, and the buffered // engine does its own reconciliation. getModifiedTimestamp(id) { throw new Error("Don't use timestamps to resolve bridged engine conflicts"); } has(id) { throw new Error( "Don't use the changeset to resolve bridged engine conflicts" ); } delete(id) { let change = this.changes[id]; if (change) { // Mark the change as synced without removing it from the set. Depending // on how we implement `trackRemainingChanges`, this may not be necessary. // It's copied from the bookmarks changeset now. change.synced = true; } } ids() { let results = []; for (let id in this.changes) { if (!this.changes[id].synced) { results.push(id); } } return results; } }