summaryrefslogtreecommitdiffstats
path: root/services/sync/modules/bridged_engine.sys.mjs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 17:32:43 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 17:32:43 +0000
commit6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch)
treea68f146d7fa01f0134297619fbe7e33db084e0aa /services/sync/modules/bridged_engine.sys.mjs
parentInitial commit. (diff)
downloadthunderbird-upstream.tar.xz
thunderbird-upstream.zip
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--services/sync/modules/bridged_engine.sys.mjs499
1 files changed, 499 insertions, 0 deletions
diff --git a/services/sync/modules/bridged_engine.sys.mjs b/services/sync/modules/bridged_engine.sys.mjs
new file mode 100644
index 0000000000..45e5f685cd
--- /dev/null
+++ b/services/sync/modules/bridged_engine.sys.mjs
@@ -0,0 +1,499 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * This file has all the machinery for hooking up bridged engines implemented
+ * in Rust. It's the JavaScript side of the Golden Gate bridge that connects
+ * Desktop Sync to a Rust `BridgedEngine`, via the `mozIBridgedSyncEngine`
+ * XPCOM interface.
+ *
+ * Creating a bridged engine only takes a few lines of code, since most of the
+ * hard work is done on the Rust side. On the JS side, you'll need to subclass
+ * `BridgedEngine` (instead of `SyncEngine`), supply a `mozIBridgedSyncEngine`
+ * for your subclass to wrap, and optionally implement and override the tracker.
+ */
+
+import { SyncEngine, Tracker } from "resource://services-sync/engines.sys.mjs";
+import { RawCryptoWrapper } from "resource://services-sync/record.sys.mjs";
+
+const lazy = {};
+
+ChromeUtils.defineESModuleGetters(lazy, {
+ Log: "resource://gre/modules/Log.sys.mjs",
+ PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs",
+});
+
+/**
+ * A stub store that converts between raw decrypted incoming records and
+ * envelopes. Since the interface we need is so minimal, this class doesn't
+ * inherit from the base `Store` implementation...it would take more code to
+ * override all those behaviors!
+ *
+ * This class isn't meant to be subclassed, because bridged engines shouldn't
+ * override their store classes in `_storeObj`.
+ */
+class BridgedStore {
+ constructor(name, engine) {
+ if (!engine) {
+ throw new Error("Store must be associated with an Engine instance.");
+ }
+ this.engine = engine;
+ this._log = lazy.Log.repository.getLogger(`Sync.Engine.${name}.Store`);
+ this._batchChunkSize = 500;
+ }
+
+ async applyIncomingBatch(records, countTelemetry) {
+ for (let chunk of lazy.PlacesUtils.chunkArray(
+ records,
+ this._batchChunkSize
+ )) {
+ let incomingEnvelopesAsJSON = chunk.map(record =>
+ JSON.stringify(record.toIncomingBso())
+ );
+ this._log.trace("incoming envelopes", incomingEnvelopesAsJSON);
+ await this.engine._bridge.storeIncoming(incomingEnvelopesAsJSON);
+ }
+ // Array of failed records.
+ return [];
+ }
+
+ async wipe() {
+ await this.engine._bridge.wipe();
+ }
+}
+
+/**
+ * A wrapper class to convert between BSOs on the JS side, and envelopes on the
+ * Rust side. This class intentionally subclasses `RawCryptoWrapper`, because we
+ * don't want the stringification and parsing machinery in `CryptoWrapper`.
+ *
+ * This class isn't meant to be subclassed, because bridged engines shouldn't
+ * override their record classes in `_recordObj`.
+ */
+class BridgedRecord extends RawCryptoWrapper {
+ /**
+ * Creates an outgoing record from a BSO returned by a bridged engine.
+ *
+ * @param {String} collection The collection name.
+ * @param {Object} bso The outgoing bso (ie, a sync15::bso::OutgoingBso) returned from
+ * `mozIBridgedSyncEngine::apply`.
+ * @return {BridgedRecord} A Sync record ready to encrypt and upload.
+ */
+ static fromOutgoingBso(collection, bso) {
+ // The BSO has already been JSON serialized coming out of Rust, so the
+ // envelope has been flattened.
+ if (typeof bso.id != "string") {
+ throw new TypeError("Outgoing BSO missing ID");
+ }
+ if (typeof bso.payload != "string") {
+ throw new TypeError("Outgoing BSO missing payload");
+ }
+ let record = new BridgedRecord(collection, bso.id);
+ record.cleartext = bso.payload;
+ return record;
+ }
+
+ transformBeforeEncrypt(cleartext) {
+ if (typeof cleartext != "string") {
+ throw new TypeError("Outgoing bridged engine records must be strings");
+ }
+ return cleartext;
+ }
+
+ transformAfterDecrypt(cleartext) {
+ if (typeof cleartext != "string") {
+ throw new TypeError("Incoming bridged engine records must be strings");
+ }
+ return cleartext;
+ }
+
+ /*
+ * Converts this incoming record into an envelope to pass to a bridged engine.
+ * This object must be kept in sync with `sync15::IncomingBso`.
+ *
+ * @return {Object} The incoming envelope, to pass to
+ * `mozIBridgedSyncEngine::storeIncoming`.
+ */
+ toIncomingBso() {
+ return {
+ id: this.data.id,
+ modified: this.data.modified,
+ payload: this.cleartext,
+ };
+ }
+}
+
+class BridgeError extends Error {
+ constructor(code, message) {
+ super(message);
+ this.name = "BridgeError";
+ // TODO: We may want to use a different name for this, since errors with
+ // a `result` property are treated specially by telemetry, discarding the
+ // message...but, unlike other `nserror`s, the message is actually useful,
+ // and we still want to capture it.
+ this.result = code;
+ }
+}
+
+class InterruptedError extends Error {
+ constructor(message) {
+ super(message);
+ this.name = "InterruptedError";
+ }
+}
+
+/**
+ * Adapts a `Log.sys.mjs` logger to a `mozIServicesLogSink`. This class is copied
+ * from `SyncedBookmarksMirror.jsm`.
+ */
+export class LogAdapter {
+ constructor(log) {
+ this.log = log;
+ }
+
+ get maxLevel() {
+ let level = this.log.level;
+ if (level <= lazy.Log.Level.All) {
+ return Ci.mozIServicesLogSink.LEVEL_TRACE;
+ }
+ if (level <= lazy.Log.Level.Info) {
+ return Ci.mozIServicesLogSink.LEVEL_DEBUG;
+ }
+ if (level <= lazy.Log.Level.Warn) {
+ return Ci.mozIServicesLogSink.LEVEL_WARN;
+ }
+ if (level <= lazy.Log.Level.Error) {
+ return Ci.mozIServicesLogSink.LEVEL_ERROR;
+ }
+ return Ci.mozIServicesLogSink.LEVEL_OFF;
+ }
+
+ trace(message) {
+ this.log.trace(message);
+ }
+
+ debug(message) {
+ this.log.debug(message);
+ }
+
+ warn(message) {
+ this.log.warn(message);
+ }
+
+ error(message) {
+ this.log.error(message);
+ }
+}
+
+// This converts the XPCOM-defined, callback-based mozIBridgedSyncEngine to
+// a promise-based implementation.
+export class BridgeWrapperXPCOM {
+ constructor(component) {
+ this.comp = component;
+ }
+
+ // A few sync, non-callback based attributes.
+ get storageVersion() {
+ return this.comp.storageVersion;
+ }
+
+ get allowSkippedRecord() {
+ return this.comp.allowSkippedRecord;
+ }
+
+ get logger() {
+ return this.comp.logger;
+ }
+
+ // And the async functions we promisify.
+ // Note this is `lastSync` via uniffi but `getLastSync` via xpcom
+ lastSync() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.getLastSync);
+ }
+
+ setLastSync(lastSyncMillis) {
+ return BridgeWrapperXPCOM.#promisify(this.comp.setLastSync, lastSyncMillis);
+ }
+
+ getSyncId() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.getSyncId);
+ }
+
+ resetSyncId() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.resetSyncId);
+ }
+
+ ensureCurrentSyncId(newSyncId) {
+ return BridgeWrapperXPCOM.#promisify(
+ this.comp.ensureCurrentSyncId,
+ newSyncId
+ );
+ }
+
+ syncStarted() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.syncStarted);
+ }
+
+ storeIncoming(incomingEnvelopesAsJSON) {
+ return BridgeWrapperXPCOM.#promisify(
+ this.comp.storeIncoming,
+ incomingEnvelopesAsJSON
+ );
+ }
+
+ apply() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.apply);
+ }
+
+ setUploaded(newTimestampMillis, uploadedIds) {
+ return BridgeWrapperXPCOM.#promisify(
+ this.comp.setUploaded,
+ newTimestampMillis,
+ uploadedIds
+ );
+ }
+
+ syncFinished() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.syncFinished);
+ }
+
+ reset() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.reset);
+ }
+
+ wipe() {
+ return BridgeWrapperXPCOM.#promisify(this.comp.wipe);
+ }
+
+ // Converts a XPCOM bridged function that takes a callback into one that returns a
+ // promise.
+ static #promisify(func, ...params) {
+ return new Promise((resolve, reject) => {
+ func(...params, {
+ // This object implicitly implements all three callback interfaces
+ // (`mozIBridgedSyncEngine{Apply, Result}Callback`), because they have
+ // the same methods. The only difference is the type of the argument
+ // passed to `handleSuccess`, which doesn't matter in JS.
+ handleSuccess: resolve,
+ handleError(code, message) {
+ reject(transformError(code, message));
+ },
+ });
+ });
+ }
+}
+
+/**
+ * A base class used to plug a Rust engine into Sync, and have it work like any
+ * other engine. The constructor takes a bridge as its first argument, which is
+ * a "bridged sync engine", as defined by UniFFI in the application-services
+ * crate.
+ * For backwards compatibility, this can also be an instance of an XPCOM
+ * component class that implements `mozIBridgedSyncEngine`, wrapped in
+ * a `BridgeWrapperXPCOM` wrapper.
+ * (Note that at time of writing, the above is slightly aspirational; the
+ * actual definition of the UniFFI shared bridged engine is still in flux.)
+ *
+ * This class inherits from `SyncEngine`, which has a lot of machinery that we
+ * don't need, but that's fairly easy to override. It would be harder to
+ * reimplement the machinery that we _do_ need here. However, because of that,
+ * this class has lots of methods that do nothing, or return empty data. The
+ * docs above each method explain what it's overriding, and why.
+ *
+ * This class is designed to be subclassed, but the only part that your engine
+ * may want to override is `_trackerObj`. Even then, using the default (no-op)
+ * tracker is fine, because the shape of the `Tracker` interface may not make
+ * sense for all engines.
+ */
+export function BridgedEngine(name, service) {
+ SyncEngine.call(this, name, service);
+}
+
+BridgedEngine.prototype = {
+ /**
+ * The Rust implemented bridge. Must be set by the engine which subclasses us.
+ */
+ _bridge: null,
+ /**
+ * The tracker class for this engine. Subclasses may want to override this
+ * with their own tracker, though using the default `Tracker` is fine.
+ */
+ _trackerObj: Tracker,
+
+ /** Returns the record class for all bridged engines. */
+ get _recordObj() {
+ return BridgedRecord;
+ },
+
+ set _recordObj(obj) {
+ throw new TypeError("Don't override the record class for bridged engines");
+ },
+
+ /** Returns the store class for all bridged engines. */
+ get _storeObj() {
+ return BridgedStore;
+ },
+
+ set _storeObj(obj) {
+ throw new TypeError("Don't override the store class for bridged engines");
+ },
+
+ /** Returns the storage version for this engine. */
+ get version() {
+ return this._bridge.storageVersion;
+ },
+
+ // Legacy engines allow sync to proceed if some records are too large to
+ // upload (eg, a payload that's bigger than the server's published limits).
+ // If this returns true, we will just skip the record without even attempting
+ // to upload. If this is false, we'll abort the entire batch.
+ // If the engine allows this, it will need to detect this scenario by noticing
+ // the ID is not in the 'success' records reported to `setUploaded`.
+ // (Note that this is not to be confused with the fact server's can currently
+ // reject records as part of a POST - but we hope to remove this ability from
+ // the server API. Note also that this is not bullet-proof - if the count of
+ // records is high, it's possible that we will have committed a previous
+ // batch before we hit the relevant limits, so things might have been written.
+ // We hope to fix this by ensuring batch limits are such that this is
+ // impossible)
+ get allowSkippedRecord() {
+ return this._bridge.allowSkippedRecord;
+ },
+
+ /**
+ * Returns the sync ID for this engine. This is exposed for tests, but
+ * Sync code always calls `resetSyncID()` and `ensureCurrentSyncID()`,
+ * not this.
+ *
+ * @returns {String?} The sync ID, or `null` if one isn't set.
+ */
+ async getSyncID() {
+ // Note that all methods on an XPCOM class instance are automatically bound,
+ // so we don't need to write `this._bridge.getSyncId.bind(this._bridge)`.
+ let syncID = await this._bridge.getSyncId();
+ return syncID;
+ },
+
+ async resetSyncID() {
+ await this._deleteServerCollection();
+ let newSyncID = await this.resetLocalSyncID();
+ return newSyncID;
+ },
+
+ async resetLocalSyncID() {
+ let newSyncID = await this._bridge.resetSyncId();
+ return newSyncID;
+ },
+
+ async ensureCurrentSyncID(newSyncID) {
+ let assignedSyncID = await this._bridge.ensureCurrentSyncId(newSyncID);
+ return assignedSyncID;
+ },
+
+ async getLastSync() {
+ // The bridge defines lastSync as integer ms, but sync itself wants to work
+ // in a float seconds with 2 decimal places.
+ let lastSyncMS = await this._bridge.lastSync();
+ return Math.round(lastSyncMS / 10) / 100;
+ },
+
+ async setLastSync(lastSyncSeconds) {
+ await this._bridge.setLastSync(Math.round(lastSyncSeconds * 1000));
+ },
+
+ /**
+ * Returns the initial changeset for the sync. Bridged engines handle
+ * reconciliation internally, so we don't know what changed until after we've
+ * stored and applied all incoming records. So we return an empty changeset
+ * here, and replace it with the real one in `_processIncoming`.
+ */
+ async pullChanges() {
+ return {};
+ },
+
+ async trackRemainingChanges() {
+ await this._bridge.syncFinished();
+ },
+
+ /**
+ * Marks a record for a hard-`DELETE` at the end of the sync. The base method
+ * also removes it from the tracker, but we don't use the tracker for that,
+ * so we override the method to just mark.
+ */
+ _deleteId(id) {
+ this._noteDeletedId(id);
+ },
+
+ /**
+ * Always stage incoming records, bypassing the base engine's reconciliation
+ * machinery.
+ */
+ async _reconcile() {
+ return true;
+ },
+
+ async _syncStartup() {
+ await super._syncStartup();
+ await this._bridge.syncStarted();
+ },
+
+ async _processIncoming(newitems) {
+ await super._processIncoming(newitems);
+
+ let outgoingBsosAsJSON = await this._bridge.apply();
+ let changeset = {};
+ for (let bsoAsJSON of outgoingBsosAsJSON) {
+ this._log.trace("outgoing bso", bsoAsJSON);
+ let record = BridgedRecord.fromOutgoingBso(
+ this.name,
+ JSON.parse(bsoAsJSON)
+ );
+ changeset[record.id] = {
+ synced: false,
+ record,
+ };
+ }
+ this._modified.replace(changeset);
+ },
+
+ /**
+ * Notify the bridged engine that we've successfully uploaded a batch, so
+ * that it can update its local state. For example, if the engine uses a
+ * mirror and a temp table for outgoing records, it can write the uploaded
+ * records from the outgoing table back to the mirror.
+ */
+ async _onRecordsWritten(succeeded, failed, serverModifiedTime) {
+ // JS uses seconds but Rust uses milliseconds so we'll need to convert
+ let serverModifiedMS = Math.round(serverModifiedTime * 1000);
+ await this._bridge.setUploaded(Math.floor(serverModifiedMS), succeeded);
+ },
+
+ async _createTombstone() {
+ throw new Error("Bridged engines don't support weak uploads");
+ },
+
+ async _createRecord(id) {
+ let change = this._modified.changes[id];
+ if (!change) {
+ throw new TypeError("Can't create record for unchanged item");
+ }
+ return change.record;
+ },
+
+ async _resetClient() {
+ await super._resetClient();
+ await this._bridge.reset();
+ },
+};
+Object.setPrototypeOf(BridgedEngine.prototype, SyncEngine.prototype);
+
+function transformError(code, message) {
+ switch (code) {
+ case Cr.NS_ERROR_ABORT:
+ return new InterruptedError(message);
+
+ default:
+ return new BridgeError(code, message);
+ }
+}