diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
commit | 6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch) | |
tree | a68f146d7fa01f0134297619fbe7e33db084e0aa /services/sync/modules/bridged_engine.sys.mjs | |
parent | Initial commit. (diff) | |
download | thunderbird-upstream.tar.xz thunderbird-upstream.zip |
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | services/sync/modules/bridged_engine.sys.mjs | 499 |
1 files changed, 499 insertions, 0 deletions
diff --git a/services/sync/modules/bridged_engine.sys.mjs b/services/sync/modules/bridged_engine.sys.mjs new file mode 100644 index 0000000000..45e5f685cd --- /dev/null +++ b/services/sync/modules/bridged_engine.sys.mjs @@ -0,0 +1,499 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * This file has all the machinery for hooking up bridged engines implemented + * in Rust. It's the JavaScript side of the Golden Gate bridge that connects + * Desktop Sync to a Rust `BridgedEngine`, via the `mozIBridgedSyncEngine` + * XPCOM interface. + * + * Creating a bridged engine only takes a few lines of code, since most of the + * hard work is done on the Rust side. On the JS side, you'll need to subclass + * `BridgedEngine` (instead of `SyncEngine`), supply a `mozIBridgedSyncEngine` + * for your subclass to wrap, and optionally implement and override the tracker. + */ + +import { SyncEngine, Tracker } from "resource://services-sync/engines.sys.mjs"; +import { RawCryptoWrapper } from "resource://services-sync/record.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + Log: "resource://gre/modules/Log.sys.mjs", + PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", +}); + +/** + * A stub store that converts between raw decrypted incoming records and + * envelopes. Since the interface we need is so minimal, this class doesn't + * inherit from the base `Store` implementation...it would take more code to + * override all those behaviors! + * + * This class isn't meant to be subclassed, because bridged engines shouldn't + * override their store classes in `_storeObj`. + */ +class BridgedStore { + constructor(name, engine) { + if (!engine) { + throw new Error("Store must be associated with an Engine instance."); + } + this.engine = engine; + this._log = lazy.Log.repository.getLogger(`Sync.Engine.${name}.Store`); + this._batchChunkSize = 500; + } + + async applyIncomingBatch(records, countTelemetry) { + for (let chunk of lazy.PlacesUtils.chunkArray( + records, + this._batchChunkSize + )) { + let incomingEnvelopesAsJSON = chunk.map(record => + JSON.stringify(record.toIncomingBso()) + ); + this._log.trace("incoming envelopes", incomingEnvelopesAsJSON); + await this.engine._bridge.storeIncoming(incomingEnvelopesAsJSON); + } + // Array of failed records. + return []; + } + + async wipe() { + await this.engine._bridge.wipe(); + } +} + +/** + * A wrapper class to convert between BSOs on the JS side, and envelopes on the + * Rust side. This class intentionally subclasses `RawCryptoWrapper`, because we + * don't want the stringification and parsing machinery in `CryptoWrapper`. + * + * This class isn't meant to be subclassed, because bridged engines shouldn't + * override their record classes in `_recordObj`. + */ +class BridgedRecord extends RawCryptoWrapper { + /** + * Creates an outgoing record from a BSO returned by a bridged engine. + * + * @param {String} collection The collection name. + * @param {Object} bso The outgoing bso (ie, a sync15::bso::OutgoingBso) returned from + * `mozIBridgedSyncEngine::apply`. + * @return {BridgedRecord} A Sync record ready to encrypt and upload. + */ + static fromOutgoingBso(collection, bso) { + // The BSO has already been JSON serialized coming out of Rust, so the + // envelope has been flattened. + if (typeof bso.id != "string") { + throw new TypeError("Outgoing BSO missing ID"); + } + if (typeof bso.payload != "string") { + throw new TypeError("Outgoing BSO missing payload"); + } + let record = new BridgedRecord(collection, bso.id); + record.cleartext = bso.payload; + return record; + } + + transformBeforeEncrypt(cleartext) { + if (typeof cleartext != "string") { + throw new TypeError("Outgoing bridged engine records must be strings"); + } + return cleartext; + } + + transformAfterDecrypt(cleartext) { + if (typeof cleartext != "string") { + throw new TypeError("Incoming bridged engine records must be strings"); + } + return cleartext; + } + + /* + * Converts this incoming record into an envelope to pass to a bridged engine. + * This object must be kept in sync with `sync15::IncomingBso`. + * + * @return {Object} The incoming envelope, to pass to + * `mozIBridgedSyncEngine::storeIncoming`. + */ + toIncomingBso() { + return { + id: this.data.id, + modified: this.data.modified, + payload: this.cleartext, + }; + } +} + +class BridgeError extends Error { + constructor(code, message) { + super(message); + this.name = "BridgeError"; + // TODO: We may want to use a different name for this, since errors with + // a `result` property are treated specially by telemetry, discarding the + // message...but, unlike other `nserror`s, the message is actually useful, + // and we still want to capture it. + this.result = code; + } +} + +class InterruptedError extends Error { + constructor(message) { + super(message); + this.name = "InterruptedError"; + } +} + +/** + * Adapts a `Log.sys.mjs` logger to a `mozIServicesLogSink`. This class is copied + * from `SyncedBookmarksMirror.jsm`. + */ +export class LogAdapter { + constructor(log) { + this.log = log; + } + + get maxLevel() { + let level = this.log.level; + if (level <= lazy.Log.Level.All) { + return Ci.mozIServicesLogSink.LEVEL_TRACE; + } + if (level <= lazy.Log.Level.Info) { + return Ci.mozIServicesLogSink.LEVEL_DEBUG; + } + if (level <= lazy.Log.Level.Warn) { + return Ci.mozIServicesLogSink.LEVEL_WARN; + } + if (level <= lazy.Log.Level.Error) { + return Ci.mozIServicesLogSink.LEVEL_ERROR; + } + return Ci.mozIServicesLogSink.LEVEL_OFF; + } + + trace(message) { + this.log.trace(message); + } + + debug(message) { + this.log.debug(message); + } + + warn(message) { + this.log.warn(message); + } + + error(message) { + this.log.error(message); + } +} + +// This converts the XPCOM-defined, callback-based mozIBridgedSyncEngine to +// a promise-based implementation. +export class BridgeWrapperXPCOM { + constructor(component) { + this.comp = component; + } + + // A few sync, non-callback based attributes. + get storageVersion() { + return this.comp.storageVersion; + } + + get allowSkippedRecord() { + return this.comp.allowSkippedRecord; + } + + get logger() { + return this.comp.logger; + } + + // And the async functions we promisify. + // Note this is `lastSync` via uniffi but `getLastSync` via xpcom + lastSync() { + return BridgeWrapperXPCOM.#promisify(this.comp.getLastSync); + } + + setLastSync(lastSyncMillis) { + return BridgeWrapperXPCOM.#promisify(this.comp.setLastSync, lastSyncMillis); + } + + getSyncId() { + return BridgeWrapperXPCOM.#promisify(this.comp.getSyncId); + } + + resetSyncId() { + return BridgeWrapperXPCOM.#promisify(this.comp.resetSyncId); + } + + ensureCurrentSyncId(newSyncId) { + return BridgeWrapperXPCOM.#promisify( + this.comp.ensureCurrentSyncId, + newSyncId + ); + } + + syncStarted() { + return BridgeWrapperXPCOM.#promisify(this.comp.syncStarted); + } + + storeIncoming(incomingEnvelopesAsJSON) { + return BridgeWrapperXPCOM.#promisify( + this.comp.storeIncoming, + incomingEnvelopesAsJSON + ); + } + + apply() { + return BridgeWrapperXPCOM.#promisify(this.comp.apply); + } + + setUploaded(newTimestampMillis, uploadedIds) { + return BridgeWrapperXPCOM.#promisify( + this.comp.setUploaded, + newTimestampMillis, + uploadedIds + ); + } + + syncFinished() { + return BridgeWrapperXPCOM.#promisify(this.comp.syncFinished); + } + + reset() { + return BridgeWrapperXPCOM.#promisify(this.comp.reset); + } + + wipe() { + return BridgeWrapperXPCOM.#promisify(this.comp.wipe); + } + + // Converts a XPCOM bridged function that takes a callback into one that returns a + // promise. + static #promisify(func, ...params) { + return new Promise((resolve, reject) => { + func(...params, { + // This object implicitly implements all three callback interfaces + // (`mozIBridgedSyncEngine{Apply, Result}Callback`), because they have + // the same methods. The only difference is the type of the argument + // passed to `handleSuccess`, which doesn't matter in JS. + handleSuccess: resolve, + handleError(code, message) { + reject(transformError(code, message)); + }, + }); + }); + } +} + +/** + * A base class used to plug a Rust engine into Sync, and have it work like any + * other engine. The constructor takes a bridge as its first argument, which is + * a "bridged sync engine", as defined by UniFFI in the application-services + * crate. + * For backwards compatibility, this can also be an instance of an XPCOM + * component class that implements `mozIBridgedSyncEngine`, wrapped in + * a `BridgeWrapperXPCOM` wrapper. + * (Note that at time of writing, the above is slightly aspirational; the + * actual definition of the UniFFI shared bridged engine is still in flux.) + * + * This class inherits from `SyncEngine`, which has a lot of machinery that we + * don't need, but that's fairly easy to override. It would be harder to + * reimplement the machinery that we _do_ need here. However, because of that, + * this class has lots of methods that do nothing, or return empty data. The + * docs above each method explain what it's overriding, and why. + * + * This class is designed to be subclassed, but the only part that your engine + * may want to override is `_trackerObj`. Even then, using the default (no-op) + * tracker is fine, because the shape of the `Tracker` interface may not make + * sense for all engines. + */ +export function BridgedEngine(name, service) { + SyncEngine.call(this, name, service); +} + +BridgedEngine.prototype = { + /** + * The Rust implemented bridge. Must be set by the engine which subclasses us. + */ + _bridge: null, + /** + * The tracker class for this engine. Subclasses may want to override this + * with their own tracker, though using the default `Tracker` is fine. + */ + _trackerObj: Tracker, + + /** Returns the record class for all bridged engines. */ + get _recordObj() { + return BridgedRecord; + }, + + set _recordObj(obj) { + throw new TypeError("Don't override the record class for bridged engines"); + }, + + /** Returns the store class for all bridged engines. */ + get _storeObj() { + return BridgedStore; + }, + + set _storeObj(obj) { + throw new TypeError("Don't override the store class for bridged engines"); + }, + + /** Returns the storage version for this engine. */ + get version() { + return this._bridge.storageVersion; + }, + + // Legacy engines allow sync to proceed if some records are too large to + // upload (eg, a payload that's bigger than the server's published limits). + // If this returns true, we will just skip the record without even attempting + // to upload. If this is false, we'll abort the entire batch. + // If the engine allows this, it will need to detect this scenario by noticing + // the ID is not in the 'success' records reported to `setUploaded`. + // (Note that this is not to be confused with the fact server's can currently + // reject records as part of a POST - but we hope to remove this ability from + // the server API. Note also that this is not bullet-proof - if the count of + // records is high, it's possible that we will have committed a previous + // batch before we hit the relevant limits, so things might have been written. + // We hope to fix this by ensuring batch limits are such that this is + // impossible) + get allowSkippedRecord() { + return this._bridge.allowSkippedRecord; + }, + + /** + * Returns the sync ID for this engine. This is exposed for tests, but + * Sync code always calls `resetSyncID()` and `ensureCurrentSyncID()`, + * not this. + * + * @returns {String?} The sync ID, or `null` if one isn't set. + */ + async getSyncID() { + // Note that all methods on an XPCOM class instance are automatically bound, + // so we don't need to write `this._bridge.getSyncId.bind(this._bridge)`. + let syncID = await this._bridge.getSyncId(); + return syncID; + }, + + async resetSyncID() { + await this._deleteServerCollection(); + let newSyncID = await this.resetLocalSyncID(); + return newSyncID; + }, + + async resetLocalSyncID() { + let newSyncID = await this._bridge.resetSyncId(); + return newSyncID; + }, + + async ensureCurrentSyncID(newSyncID) { + let assignedSyncID = await this._bridge.ensureCurrentSyncId(newSyncID); + return assignedSyncID; + }, + + async getLastSync() { + // The bridge defines lastSync as integer ms, but sync itself wants to work + // in a float seconds with 2 decimal places. + let lastSyncMS = await this._bridge.lastSync(); + return Math.round(lastSyncMS / 10) / 100; + }, + + async setLastSync(lastSyncSeconds) { + await this._bridge.setLastSync(Math.round(lastSyncSeconds * 1000)); + }, + + /** + * Returns the initial changeset for the sync. Bridged engines handle + * reconciliation internally, so we don't know what changed until after we've + * stored and applied all incoming records. So we return an empty changeset + * here, and replace it with the real one in `_processIncoming`. + */ + async pullChanges() { + return {}; + }, + + async trackRemainingChanges() { + await this._bridge.syncFinished(); + }, + + /** + * Marks a record for a hard-`DELETE` at the end of the sync. The base method + * also removes it from the tracker, but we don't use the tracker for that, + * so we override the method to just mark. + */ + _deleteId(id) { + this._noteDeletedId(id); + }, + + /** + * Always stage incoming records, bypassing the base engine's reconciliation + * machinery. + */ + async _reconcile() { + return true; + }, + + async _syncStartup() { + await super._syncStartup(); + await this._bridge.syncStarted(); + }, + + async _processIncoming(newitems) { + await super._processIncoming(newitems); + + let outgoingBsosAsJSON = await this._bridge.apply(); + let changeset = {}; + for (let bsoAsJSON of outgoingBsosAsJSON) { + this._log.trace("outgoing bso", bsoAsJSON); + let record = BridgedRecord.fromOutgoingBso( + this.name, + JSON.parse(bsoAsJSON) + ); + changeset[record.id] = { + synced: false, + record, + }; + } + this._modified.replace(changeset); + }, + + /** + * Notify the bridged engine that we've successfully uploaded a batch, so + * that it can update its local state. For example, if the engine uses a + * mirror and a temp table for outgoing records, it can write the uploaded + * records from the outgoing table back to the mirror. + */ + async _onRecordsWritten(succeeded, failed, serverModifiedTime) { + // JS uses seconds but Rust uses milliseconds so we'll need to convert + let serverModifiedMS = Math.round(serverModifiedTime * 1000); + await this._bridge.setUploaded(Math.floor(serverModifiedMS), succeeded); + }, + + async _createTombstone() { + throw new Error("Bridged engines don't support weak uploads"); + }, + + async _createRecord(id) { + let change = this._modified.changes[id]; + if (!change) { + throw new TypeError("Can't create record for unchanged item"); + } + return change.record; + }, + + async _resetClient() { + await super._resetClient(); + await this._bridge.reset(); + }, +}; +Object.setPrototypeOf(BridgedEngine.prototype, SyncEngine.prototype); + +function transformError(code, message) { + switch (code) { + case Cr.NS_ERROR_ABORT: + return new InterruptedError(message); + + default: + return new BridgeError(code, message); + } +} |