summaryrefslogtreecommitdiffstats
path: root/services/sync/modules/bridged_engine.sys.mjs
blob: 45e5f685cd4c81d798a7d53c72435e4bae64c5fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

/**
 * This file has all the machinery for hooking up bridged engines implemented
 * in Rust. It's the JavaScript side of the Golden Gate bridge that connects
 * Desktop Sync to a Rust `BridgedEngine`, via the `mozIBridgedSyncEngine`
 * XPCOM interface.
 *
 * Creating a bridged engine only takes a few lines of code, since most of the
 * hard work is done on the Rust side. On the JS side, you'll need to subclass
 * `BridgedEngine` (instead of `SyncEngine`), supply a `mozIBridgedSyncEngine`
 * for your subclass to wrap, and optionally implement and override the tracker.
 */

import { SyncEngine, Tracker } from "resource://services-sync/engines.sys.mjs";
import { RawCryptoWrapper } from "resource://services-sync/record.sys.mjs";

const lazy = {};

ChromeUtils.defineESModuleGetters(lazy, {
  Log: "resource://gre/modules/Log.sys.mjs",
  PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs",
});

/**
 * A stub store that converts between raw decrypted incoming records and
 * envelopes. Since the interface we need is so minimal, this class doesn't
 * inherit from the base `Store` implementation...it would take more code to
 * override all those behaviors!
 *
 * This class isn't meant to be subclassed, because bridged engines shouldn't
 * override their store classes in `_storeObj`.
 */
class BridgedStore {
  constructor(name, engine) {
    if (!engine) {
      throw new Error("Store must be associated with an Engine instance.");
    }
    this.engine = engine;
    this._log = lazy.Log.repository.getLogger(`Sync.Engine.${name}.Store`);
    this._batchChunkSize = 500;
  }

  async applyIncomingBatch(records, countTelemetry) {
    for (let chunk of lazy.PlacesUtils.chunkArray(
      records,
      this._batchChunkSize
    )) {
      let incomingEnvelopesAsJSON = chunk.map(record =>
        JSON.stringify(record.toIncomingBso())
      );
      this._log.trace("incoming envelopes", incomingEnvelopesAsJSON);
      await this.engine._bridge.storeIncoming(incomingEnvelopesAsJSON);
    }
    // Array of failed records.
    return [];
  }

  async wipe() {
    await this.engine._bridge.wipe();
  }
}

/**
 * A wrapper class to convert between BSOs on the JS side, and envelopes on the
 * Rust side. This class intentionally subclasses `RawCryptoWrapper`, because we
 * don't want the stringification and parsing machinery in `CryptoWrapper`.
 *
 * This class isn't meant to be subclassed, because bridged engines shouldn't
 * override their record classes in `_recordObj`.
 */
class BridgedRecord extends RawCryptoWrapper {
  /**
   * Creates an outgoing record from a BSO returned by a bridged engine.
   *
   * @param  {String} collection The collection name.
   * @param  {Object} bso   The outgoing bso (ie, a sync15::bso::OutgoingBso) returned from
   *                        `mozIBridgedSyncEngine::apply`.
   * @return {BridgedRecord}     A Sync record ready to encrypt and upload.
   */
  static fromOutgoingBso(collection, bso) {
    // The BSO has already been JSON serialized coming out of Rust, so the
    // envelope has been flattened.
    if (typeof bso.id != "string") {
      throw new TypeError("Outgoing BSO missing ID");
    }
    if (typeof bso.payload != "string") {
      throw new TypeError("Outgoing BSO missing payload");
    }
    let record = new BridgedRecord(collection, bso.id);
    record.cleartext = bso.payload;
    return record;
  }

  transformBeforeEncrypt(cleartext) {
    if (typeof cleartext != "string") {
      throw new TypeError("Outgoing bridged engine records must be strings");
    }
    return cleartext;
  }

  transformAfterDecrypt(cleartext) {
    if (typeof cleartext != "string") {
      throw new TypeError("Incoming bridged engine records must be strings");
    }
    return cleartext;
  }

  /*
   * Converts this incoming record into an envelope to pass to a bridged engine.
   * This object must be kept in sync with `sync15::IncomingBso`.
   *
   * @return {Object} The incoming envelope, to pass to
   *                  `mozIBridgedSyncEngine::storeIncoming`.
   */
  toIncomingBso() {
    return {
      id: this.data.id,
      modified: this.data.modified,
      payload: this.cleartext,
    };
  }
}

class BridgeError extends Error {
  constructor(code, message) {
    super(message);
    this.name = "BridgeError";
    // TODO: We may want to use a different name for this, since errors with
    // a `result` property are treated specially by telemetry, discarding the
    // message...but, unlike other `nserror`s, the message is actually useful,
    // and we still want to capture it.
    this.result = code;
  }
}

class InterruptedError extends Error {
  constructor(message) {
    super(message);
    this.name = "InterruptedError";
  }
}

/**
 * Adapts a `Log.sys.mjs` logger to a `mozIServicesLogSink`. This class is copied
 * from `SyncedBookmarksMirror.jsm`.
 */
export class LogAdapter {
  constructor(log) {
    this.log = log;
  }

  get maxLevel() {
    let level = this.log.level;
    if (level <= lazy.Log.Level.All) {
      return Ci.mozIServicesLogSink.LEVEL_TRACE;
    }
    if (level <= lazy.Log.Level.Info) {
      return Ci.mozIServicesLogSink.LEVEL_DEBUG;
    }
    if (level <= lazy.Log.Level.Warn) {
      return Ci.mozIServicesLogSink.LEVEL_WARN;
    }
    if (level <= lazy.Log.Level.Error) {
      return Ci.mozIServicesLogSink.LEVEL_ERROR;
    }
    return Ci.mozIServicesLogSink.LEVEL_OFF;
  }

  trace(message) {
    this.log.trace(message);
  }

  debug(message) {
    this.log.debug(message);
  }

  warn(message) {
    this.log.warn(message);
  }

  error(message) {
    this.log.error(message);
  }
}

// This converts the XPCOM-defined, callback-based mozIBridgedSyncEngine to
// a promise-based implementation.
export class BridgeWrapperXPCOM {
  constructor(component) {
    this.comp = component;
  }

  // A few sync, non-callback based attributes.
  get storageVersion() {
    return this.comp.storageVersion;
  }

  get allowSkippedRecord() {
    return this.comp.allowSkippedRecord;
  }

  get logger() {
    return this.comp.logger;
  }

  // And the async functions we promisify.
  // Note this is `lastSync` via uniffi but `getLastSync` via xpcom
  lastSync() {
    return BridgeWrapperXPCOM.#promisify(this.comp.getLastSync);
  }

  setLastSync(lastSyncMillis) {
    return BridgeWrapperXPCOM.#promisify(this.comp.setLastSync, lastSyncMillis);
  }

  getSyncId() {
    return BridgeWrapperXPCOM.#promisify(this.comp.getSyncId);
  }

  resetSyncId() {
    return BridgeWrapperXPCOM.#promisify(this.comp.resetSyncId);
  }

  ensureCurrentSyncId(newSyncId) {
    return BridgeWrapperXPCOM.#promisify(
      this.comp.ensureCurrentSyncId,
      newSyncId
    );
  }

  syncStarted() {
    return BridgeWrapperXPCOM.#promisify(this.comp.syncStarted);
  }

  storeIncoming(incomingEnvelopesAsJSON) {
    return BridgeWrapperXPCOM.#promisify(
      this.comp.storeIncoming,
      incomingEnvelopesAsJSON
    );
  }

  apply() {
    return BridgeWrapperXPCOM.#promisify(this.comp.apply);
  }

  setUploaded(newTimestampMillis, uploadedIds) {
    return BridgeWrapperXPCOM.#promisify(
      this.comp.setUploaded,
      newTimestampMillis,
      uploadedIds
    );
  }

  syncFinished() {
    return BridgeWrapperXPCOM.#promisify(this.comp.syncFinished);
  }

  reset() {
    return BridgeWrapperXPCOM.#promisify(this.comp.reset);
  }

  wipe() {
    return BridgeWrapperXPCOM.#promisify(this.comp.wipe);
  }

  // Converts a XPCOM bridged function that takes a callback into one that returns a
  // promise.
  static #promisify(func, ...params) {
    return new Promise((resolve, reject) => {
      func(...params, {
        // This object implicitly implements all three callback interfaces
        // (`mozIBridgedSyncEngine{Apply, Result}Callback`), because they have
        // the same methods. The only difference is the type of the argument
        // passed to `handleSuccess`, which doesn't matter in JS.
        handleSuccess: resolve,
        handleError(code, message) {
          reject(transformError(code, message));
        },
      });
    });
  }
}

/**
 * A base class used to plug a Rust engine into Sync, and have it work like any
 * other engine. The constructor takes a bridge as its first argument, which is
 * a "bridged sync engine", as defined by UniFFI in the application-services
 * crate.
 * For backwards compatibility, this can also be an instance of an XPCOM
 * component class that implements `mozIBridgedSyncEngine`, wrapped in
 * a `BridgeWrapperXPCOM` wrapper.
 * (Note that at time of writing, the above is slightly aspirational; the
 * actual definition of the UniFFI shared bridged engine is still in flux.)
 *
 * This class inherits from `SyncEngine`, which has a lot of machinery that we
 * don't need, but that's fairly easy to override. It would be harder to
 * reimplement the machinery that we _do_ need here. However, because of that,
 * this class has lots of methods that do nothing, or return empty data. The
 * docs above each method explain what it's overriding, and why.
 *
 * This class is designed to be subclassed, but the only part that your engine
 * may want to override is `_trackerObj`. Even then, using the default (no-op)
 * tracker is fine, because the shape of the `Tracker` interface may not make
 * sense for all engines.
 */
export function BridgedEngine(name, service) {
  SyncEngine.call(this, name, service);
}

BridgedEngine.prototype = {
  /**
   * The Rust implemented bridge. Must be set by the engine which subclasses us.
   */
  _bridge: null,
  /**
   * The tracker class for this engine. Subclasses may want to override this
   * with their own tracker, though using the default `Tracker` is fine.
   */
  _trackerObj: Tracker,

  /** Returns the record class for all bridged engines. */
  get _recordObj() {
    return BridgedRecord;
  },

  set _recordObj(obj) {
    throw new TypeError("Don't override the record class for bridged engines");
  },

  /** Returns the store class for all bridged engines. */
  get _storeObj() {
    return BridgedStore;
  },

  set _storeObj(obj) {
    throw new TypeError("Don't override the store class for bridged engines");
  },

  /** Returns the storage version for this engine. */
  get version() {
    return this._bridge.storageVersion;
  },

  // Legacy engines allow sync to proceed if some records are too large to
  // upload (eg, a payload that's bigger than the server's published limits).
  // If this returns true, we will just skip the record without even attempting
  // to upload. If this is false, we'll abort the entire batch.
  // If the engine allows this, it will need to detect this scenario by noticing
  // the ID is not in the 'success' records reported to `setUploaded`.
  // (Note that this is not to be confused with the fact server's can currently
  // reject records as part of a POST - but we hope to remove this ability from
  // the server API. Note also that this is not bullet-proof - if the count of
  // records is high, it's possible that we will have committed a previous
  // batch before we hit the relevant limits, so things might have been written.
  // We hope to fix this by ensuring batch limits are such that this is
  // impossible)
  get allowSkippedRecord() {
    return this._bridge.allowSkippedRecord;
  },

  /**
   * Returns the sync ID for this engine. This is exposed for tests, but
   * Sync code always calls `resetSyncID()` and `ensureCurrentSyncID()`,
   * not this.
   *
   * @returns {String?} The sync ID, or `null` if one isn't set.
   */
  async getSyncID() {
    // Note that all methods on an XPCOM class instance are automatically bound,
    // so we don't need to write `this._bridge.getSyncId.bind(this._bridge)`.
    let syncID = await this._bridge.getSyncId();
    return syncID;
  },

  async resetSyncID() {
    await this._deleteServerCollection();
    let newSyncID = await this.resetLocalSyncID();
    return newSyncID;
  },

  async resetLocalSyncID() {
    let newSyncID = await this._bridge.resetSyncId();
    return newSyncID;
  },

  async ensureCurrentSyncID(newSyncID) {
    let assignedSyncID = await this._bridge.ensureCurrentSyncId(newSyncID);
    return assignedSyncID;
  },

  async getLastSync() {
    // The bridge defines lastSync as integer ms, but sync itself wants to work
    // in a float seconds with 2 decimal places.
    let lastSyncMS = await this._bridge.lastSync();
    return Math.round(lastSyncMS / 10) / 100;
  },

  async setLastSync(lastSyncSeconds) {
    await this._bridge.setLastSync(Math.round(lastSyncSeconds * 1000));
  },

  /**
   * Returns the initial changeset for the sync. Bridged engines handle
   * reconciliation internally, so we don't know what changed until after we've
   * stored and applied all incoming records. So we return an empty changeset
   * here, and replace it with the real one in `_processIncoming`.
   */
  async pullChanges() {
    return {};
  },

  async trackRemainingChanges() {
    await this._bridge.syncFinished();
  },

  /**
   * Marks a record for a hard-`DELETE` at the end of the sync. The base method
   * also removes it from the tracker, but we don't use the tracker for that,
   * so we override the method to just mark.
   */
  _deleteId(id) {
    this._noteDeletedId(id);
  },

  /**
   * Always stage incoming records, bypassing the base engine's reconciliation
   * machinery.
   */
  async _reconcile() {
    return true;
  },

  async _syncStartup() {
    await super._syncStartup();
    await this._bridge.syncStarted();
  },

  async _processIncoming(newitems) {
    await super._processIncoming(newitems);

    let outgoingBsosAsJSON = await this._bridge.apply();
    let changeset = {};
    for (let bsoAsJSON of outgoingBsosAsJSON) {
      this._log.trace("outgoing bso", bsoAsJSON);
      let record = BridgedRecord.fromOutgoingBso(
        this.name,
        JSON.parse(bsoAsJSON)
      );
      changeset[record.id] = {
        synced: false,
        record,
      };
    }
    this._modified.replace(changeset);
  },

  /**
   * Notify the bridged engine that we've successfully uploaded a batch, so
   * that it can update its local state. For example, if the engine uses a
   * mirror and a temp table for outgoing records, it can write the uploaded
   * records from the outgoing table back to the mirror.
   */
  async _onRecordsWritten(succeeded, failed, serverModifiedTime) {
    // JS uses seconds but Rust uses milliseconds so we'll need to convert
    let serverModifiedMS = Math.round(serverModifiedTime * 1000);
    await this._bridge.setUploaded(Math.floor(serverModifiedMS), succeeded);
  },

  async _createTombstone() {
    throw new Error("Bridged engines don't support weak uploads");
  },

  async _createRecord(id) {
    let change = this._modified.changes[id];
    if (!change) {
      throw new TypeError("Can't create record for unchanged item");
    }
    return change.record;
  },

  async _resetClient() {
    await super._resetClient();
    await this._bridge.reset();
  },
};
Object.setPrototypeOf(BridgedEngine.prototype, SyncEngine.prototype);

function transformError(code, message) {
  switch (code) {
    case Cr.NS_ERROR_ABORT:
      return new InterruptedError(message);

    default:
      return new BridgeError(code, message);
  }
}