/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ const CRYPTO_COLLECTION = "crypto"; const KEYS_WBO = "keys"; import { Log } from "resource://gre/modules/Log.sys.mjs"; import { DEFAULT_DOWNLOAD_BATCH_SIZE, DEFAULT_KEYBUNDLE_NAME, } from "resource://services-sync/constants.sys.mjs"; import { BulkKeyBundle } from "resource://services-sync/keys.sys.mjs"; import { Weave } from "resource://services-sync/main.sys.mjs"; import { Resource } from "resource://services-sync/resource.sys.mjs"; import { Utils } from "resource://services-sync/util.sys.mjs"; import { Async } from "resource://services-common/async.sys.mjs"; import { CommonUtils } from "resource://services-common/utils.sys.mjs"; import { CryptoUtils } from "resource://services-crypto/utils.sys.mjs"; /** * The base class for all Sync basic storage objects (BSOs). This is the format * used to store all records on the Sync server. In an earlier version of the * Sync protocol, BSOs used to be called WBOs, or Weave Basic Objects. This * class retains the old name. * * @class * @param {String} collection The collection name for this BSO. * @param {String} id The ID of this BSO. */ export function WBORecord(collection, id) { this.data = {}; this.payload = {}; this.collection = collection; // Optional. this.id = id; // Optional. } WBORecord.prototype = { _logName: "Sync.Record.WBO", get sortindex() { if (this.data.sortindex) { return this.data.sortindex; } return 0; }, // Get thyself from your URI, then deserialize. // Set thine 'response' field. async fetch(resource) { if (!(resource instanceof Resource)) { throw new Error("First argument must be a Resource instance."); } let r = await resource.get(); if (r.success) { this.deserialize(r.obj); // Warning! Muffles exceptions! } this.response = r; return this; }, upload(resource) { if (!(resource instanceof Resource)) { throw new Error("First argument must be a Resource instance."); } return resource.put(this); }, // Take a base URI string, with trailing slash, and return the URI of this // WBO based on collection and ID. uri(base) { if (this.collection && this.id) { let url = CommonUtils.makeURI(base + this.collection + "/" + this.id); url.QueryInterface(Ci.nsIURL); return url; } return null; }, deserialize: function deserialize(json) { if (!json || typeof json !== "object") { throw new TypeError("Can't deserialize record from: " + json); } this.data = json; try { // The payload is likely to be JSON, but if not, keep it as a string this.payload = JSON.parse(this.payload); } catch (ex) {} }, toJSON: function toJSON() { // Copy fields from data to be stringified, making sure payload is a string let obj = {}; for (let [key, val] of Object.entries(this.data)) { obj[key] = key == "payload" ? JSON.stringify(val) : val; } if (this.ttl) { obj.ttl = this.ttl; } return obj; }, toString: function toString() { return ( "{ " + "id: " + this.id + " " + "index: " + this.sortindex + " " + "modified: " + this.modified + " " + "ttl: " + this.ttl + " " + "payload: " + JSON.stringify(this.payload) + " }" ); }, }; Utils.deferGetSet(WBORecord, "data", [ "id", "modified", "sortindex", "payload", ]); /** * An encrypted BSO record. This subclass handles encrypting and decrypting the * BSO payload, but doesn't parse or interpret the cleartext string. Subclasses * must override `transformBeforeEncrypt` and `transformAfterDecrypt` to process * the cleartext. * * This class is only exposed for bridged engines, which handle serialization * and deserialization in Rust. Sync engines implemented in JS should subclass * `CryptoWrapper` instead, which takes care of transforming the cleartext into * an object, and ensuring its contents are valid. * * @class * @template Cleartext * @param {String} collection The collection name for this BSO. * @param {String} id The ID of this BSO. */ export function RawCryptoWrapper(collection, id) { // Setting properties before calling the superclass constructor isn't allowed // in new-style classes (`class MyRecord extends RawCryptoWrapper`), but // allowed with plain functions. This is also why `defaultCleartext` is a // method, and not simply set in the subclass constructor. this.cleartext = this.defaultCleartext(); WBORecord.call(this, collection, id); this.ciphertext = null; } RawCryptoWrapper.prototype = { _logName: "Sync.Record.RawCryptoWrapper", /** * Returns the default empty cleartext for this record type. This is exposed * as a method so that subclasses can override it, and access the default * cleartext in their constructors. `CryptoWrapper`, for example, overrides * this to return an empty object, so that initializing the `id` in its * constructor calls its overridden `id` setter. * * @returns {Cleartext} An empty cleartext. */ defaultCleartext() { return null; }, /** * Transforms the cleartext into a string that can be encrypted and wrapped * in a BSO payload. This is called before uploading the record to the server. * * @param {Cleartext} outgoingCleartext The cleartext to upload. * @returns {String} The serialized cleartext. */ transformBeforeEncrypt(outgoingCleartext) { throw new TypeError("Override to stringify outgoing records"); }, /** * Transforms an incoming cleartext string into an instance of the * `Cleartext` type. This is called when fetching the record from the * server. * * @param {String} incomingCleartext The decrypted cleartext string. * @returns {Cleartext} The parsed cleartext. */ transformAfterDecrypt(incomingCleartext) { throw new TypeError("Override to parse incoming records"); }, ciphertextHMAC: async function ciphertextHMAC(keyBundle) { let hmacKeyByteString = keyBundle.hmacKey; if (!hmacKeyByteString) { throw new Error("Cannot compute HMAC without an HMAC key."); } let hmacKey = CommonUtils.byteStringToArrayBuffer(hmacKeyByteString); // NB: this.ciphertext is a base64-encoded string. For some reason this // implementation computes the HMAC on the encoded value. let data = CommonUtils.byteStringToArrayBuffer(this.ciphertext); let hmac = await CryptoUtils.hmac("SHA-256", hmacKey, data); return CommonUtils.bytesAsHex(CommonUtils.arrayBufferToByteString(hmac)); }, /* * Don't directly use the sync key. Instead, grab a key for this * collection, which is decrypted with the sync key. * * Cache those keys; invalidate the cache if the time on the keys collection * changes, or other auth events occur. * * Optional key bundle overrides the collection key lookup. */ async encrypt(keyBundle) { if (!keyBundle) { throw new Error("A key bundle must be supplied to encrypt."); } this.IV = Weave.Crypto.generateRandomIV(); this.ciphertext = await Weave.Crypto.encrypt( this.transformBeforeEncrypt(this.cleartext), keyBundle.encryptionKeyB64, this.IV ); this.hmac = await this.ciphertextHMAC(keyBundle); this.cleartext = null; }, // Optional key bundle. async decrypt(keyBundle) { if (!this.ciphertext) { throw new Error("No ciphertext: nothing to decrypt?"); } if (!keyBundle) { throw new Error("A key bundle must be supplied to decrypt."); } // Authenticate the encrypted blob with the expected HMAC let computedHMAC = await this.ciphertextHMAC(keyBundle); if (computedHMAC != this.hmac) { Utils.throwHMACMismatch(this.hmac, computedHMAC); } let cleartext = await Weave.Crypto.decrypt( this.ciphertext, keyBundle.encryptionKeyB64, this.IV ); this.cleartext = this.transformAfterDecrypt(cleartext); this.ciphertext = null; return this.cleartext; }, }; Object.setPrototypeOf(RawCryptoWrapper.prototype, WBORecord.prototype); Utils.deferGetSet(RawCryptoWrapper, "payload", ["ciphertext", "IV", "hmac"]); /** * An encrypted BSO record with a JSON payload. All engines implemented in JS * should subclass this class to describe their own record types. * * @class * @param {String} collection The collection name for this BSO. * @param {String} id The ID of this BSO. */ export function CryptoWrapper(collection, id) { RawCryptoWrapper.call(this, collection, id); } CryptoWrapper.prototype = { _logName: "Sync.Record.CryptoWrapper", defaultCleartext() { return {}; }, transformBeforeEncrypt(cleartext) { return JSON.stringify(cleartext); }, transformAfterDecrypt(cleartext) { // Handle invalid data here. Elsewhere we assume that cleartext is an object. let json_result = JSON.parse(cleartext); if (!(json_result && json_result instanceof Object)) { throw new Error( `Decryption failed: result is <${json_result}>, not an object.` ); } // If the payload has an encrypted id ensure it matches the requested record's id. if (json_result.id && json_result.id != this.id) { throw new Error(`Record id mismatch: ${json_result.id} != ${this.id}`); } return json_result; }, cleartextToString() { return JSON.stringify(this.cleartext); }, toString: function toString() { let payload = this.deleted ? "DELETED" : this.cleartextToString(); return ( "{ " + "id: " + this.id + " " + "index: " + this.sortindex + " " + "modified: " + this.modified + " " + "ttl: " + this.ttl + " " + "payload: " + payload + " " + "collection: " + (this.collection || "undefined") + " }" ); }, // The custom setter below masks the parent's getter, so explicitly call it :( get id() { return super.id; }, // Keep both plaintext and encrypted versions of the id to verify integrity set id(val) { super.id = val; this.cleartext.id = val; }, }; Object.setPrototypeOf(CryptoWrapper.prototype, RawCryptoWrapper.prototype); Utils.deferGetSet(CryptoWrapper, "cleartext", "deleted"); /** * An interface and caching layer for records. */ export function RecordManager(service) { this.service = service; this._log = Log.repository.getLogger(this._logName); this._records = {}; } RecordManager.prototype = { _recordType: CryptoWrapper, _logName: "Sync.RecordManager", async import(url) { this._log.trace("Importing record: " + (url.spec ? url.spec : url)); try { // Clear out the last response with empty object if GET fails this.response = {}; this.response = await this.service.resource(url).get(); // Don't parse and save the record on failure if (!this.response.success) { return null; } let record = new this._recordType(url); record.deserialize(this.response.obj); return this.set(url, record); } catch (ex) { if (Async.isShutdownException(ex)) { throw ex; } this._log.debug("Failed to import record", ex); return null; } }, get(url) { // Use a url string as the key to the hash let spec = url.spec ? url.spec : url; if (spec in this._records) { return Promise.resolve(this._records[spec]); } return this.import(url); }, set: function RecordMgr_set(url, record) { let spec = url.spec ? url.spec : url; return (this._records[spec] = record); }, contains: function RecordMgr_contains(url) { if ((url.spec || url) in this._records) { return true; } return false; }, clearCache: function recordMgr_clearCache() { this._records = {}; }, del: function RecordMgr_del(url) { delete this._records[url]; }, }; /** * Keeps track of mappings between collection names ('tabs') and KeyBundles. * * You can update this thing simply by giving it /info/collections. It'll * use the last modified time to bring itself up to date. */ export function CollectionKeyManager(lastModified, default_, collections) { this.lastModified = lastModified || 0; this._default = default_ || null; this._collections = collections || {}; this._log = Log.repository.getLogger("Sync.CollectionKeyManager"); } // TODO: persist this locally as an Identity. Bug 610913. // Note that the last modified time needs to be preserved. CollectionKeyManager.prototype = { /** * Generate a new CollectionKeyManager that has the same attributes * as this one. */ clone() { const newCollections = {}; for (let c in this._collections) { newCollections[c] = this._collections[c]; } return new CollectionKeyManager( this.lastModified, this._default, newCollections ); }, // Return information about old vs new keys: // * same: true if two collections are equal // * changed: an array of collection names that changed. _compareKeyBundleCollections: function _compareKeyBundleCollections(m1, m2) { let changed = []; function process(m1, m2) { for (let k1 in m1) { let v1 = m1[k1]; let v2 = m2[k1]; if (!(v1 && v2 && v1.equals(v2))) { changed.push(k1); } } } // Diffs both ways. process(m1, m2); process(m2, m1); // Return a sorted, unique array. changed.sort(); let last; changed = changed.filter(x => x != last && (last = x)); return { same: !changed.length, changed }; }, get isClear() { return !this._default; }, clear: function clear() { this._log.info("Clearing collection keys..."); this.lastModified = 0; this._collections = {}; this._default = null; }, keyForCollection(collection) { if (collection && this._collections[collection]) { return this._collections[collection]; } return this._default; }, /** * If `collections` (an array of strings) is provided, iterate * over it and generate random keys for each collection. * Create a WBO for the given data. */ _makeWBO(collections, defaultBundle) { let wbo = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO); let c = {}; for (let k in collections) { c[k] = collections[k].keyPairB64; } wbo.cleartext = { default: defaultBundle ? defaultBundle.keyPairB64 : null, collections: c, collection: CRYPTO_COLLECTION, id: KEYS_WBO, }; return wbo; }, /** * Create a WBO for the current keys. */ asWBO(collection, id) { return this._makeWBO(this._collections, this._default); }, /** * Compute a new default key, and new keys for any specified collections. */ async newKeys(collections) { let newDefaultKeyBundle = await this.newDefaultKeyBundle(); let newColls = {}; if (collections) { for (let c of collections) { let b = new BulkKeyBundle(c); await b.generateRandom(); newColls[c] = b; } } return [newDefaultKeyBundle, newColls]; }, /** * Generates new keys, but does not replace our local copy. Use this to * verify an upload before storing. */ async generateNewKeysWBO(collections) { let newDefaultKey, newColls; [newDefaultKey, newColls] = await this.newKeys(collections); return this._makeWBO(newColls, newDefaultKey); }, /** * Create a new default key. * * @returns {BulkKeyBundle} */ async newDefaultKeyBundle() { const key = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME); await key.generateRandom(); return key; }, /** * Create a new default key and store it as this._default, since without one you cannot use setContents. */ async generateDefaultKey() { this._default = await this.newDefaultKeyBundle(); }, /** * Return true if keys are already present for each of the given * collections. */ hasKeysFor(collections) { // We can't use filter() here because sometimes collections is an iterator. for (let collection of collections) { if (!this._collections[collection]) { return false; } } return true; }, /** * Return a new CollectionKeyManager that has keys for each of the * given collections (creating new ones for collections where we * don't already have keys). */ async ensureKeysFor(collections) { const newKeys = Object.assign({}, this._collections); for (let c of collections) { if (newKeys[c]) { continue; // don't replace existing keys } const b = new BulkKeyBundle(c); await b.generateRandom(); newKeys[c] = b; } return new CollectionKeyManager(this.lastModified, this._default, newKeys); }, // Take the fetched info/collections WBO, checking the change // time of the crypto collection. updateNeeded(info_collections) { this._log.info( "Testing for updateNeeded. Last modified: " + this.lastModified ); // No local record of modification time? Need an update. if (!this.lastModified) { return true; } // No keys on the server? We need an update, though our // update handling will be a little more drastic... if (!(CRYPTO_COLLECTION in info_collections)) { return true; } // Otherwise, we need an update if our modification time is stale. return info_collections[CRYPTO_COLLECTION] > this.lastModified; }, // // Set our keys and modified time to the values fetched from the server. // Returns one of three values: // // * If the default key was modified, return true. // * If the default key was not modified, but per-collection keys were, // return an array of such. // * Otherwise, return false -- we were up-to-date. // setContents: function setContents(payload, modified) { let self = this; this._log.info( "Setting collection keys contents. Our last modified: " + this.lastModified + ", input modified: " + modified + "." ); if (!payload) { throw new Error("No payload in CollectionKeyManager.setContents()."); } if (!payload.default) { this._log.warn("No downloaded default key: this should not occur."); this._log.warn("Not clearing local keys."); throw new Error( "No default key in CollectionKeyManager.setContents(). Cannot proceed." ); } // Process the incoming default key. let b = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME); b.keyPairB64 = payload.default; let newDefault = b; // Process the incoming collections. let newCollections = {}; if ("collections" in payload) { this._log.info("Processing downloaded per-collection keys."); let colls = payload.collections; for (let k in colls) { let v = colls[k]; if (v) { let keyObj = new BulkKeyBundle(k); keyObj.keyPairB64 = v; newCollections[k] = keyObj; } } } // Check to see if these are already our keys. let sameDefault = this._default && this._default.equals(newDefault); let collComparison = this._compareKeyBundleCollections( newCollections, this._collections ); let sameColls = collComparison.same; if (sameDefault && sameColls) { self._log.info("New keys are the same as our old keys!"); if (modified) { self._log.info("Bumped local modified time."); self.lastModified = modified; } return false; } // Make sure things are nice and tidy before we set. this.clear(); this._log.info("Saving downloaded keys."); this._default = newDefault; this._collections = newCollections; // Always trust the server. if (modified) { self._log.info("Bumping last modified to " + modified); self.lastModified = modified; } return sameDefault ? collComparison.changed : true; }, async updateContents(syncKeyBundle, storage_keys) { let log = this._log; log.info("Updating collection keys..."); // storage_keys is a WBO, fetched from storage/crypto/keys. // Its payload is the default key, and a map of collections to keys. // We lazily compute the key objects from the strings we're given. let payload; try { payload = await storage_keys.decrypt(syncKeyBundle); } catch (ex) { log.warn("Got exception decrypting storage keys with sync key.", ex); log.info("Aborting updateContents. Rethrowing."); throw ex; } let r = this.setContents(payload, storage_keys.modified); log.info("Collection keys updated."); return r; }, }; export function Collection(uri, recordObj, service) { if (!service) { throw new Error("Collection constructor requires a service."); } Resource.call(this, uri); // This is a bit hacky, but gets the job done. let res = service.resource(uri); this.authenticator = res.authenticator; this._recordObj = recordObj; this._service = service; this._full = false; this._ids = null; this._limit = 0; this._older = 0; this._newer = 0; this._data = []; // optional members used by batch upload operations. this._batch = null; this._commit = false; // Used for batch download operations -- note that this is explicitly an // opaque value and not (necessarily) a number. this._offset = null; } Collection.prototype = { _logName: "Sync.Collection", _rebuildURL: function Coll__rebuildURL() { // XXX should consider what happens if it's not a URL... this.uri.QueryInterface(Ci.nsIURL); let args = []; if (this.older) { args.push("older=" + this.older); } if (this.newer) { args.push("newer=" + this.newer); } if (this.full) { args.push("full=1"); } if (this.sort) { args.push("sort=" + this.sort); } if (this.ids != null) { args.push("ids=" + this.ids); } if (this.limit > 0 && this.limit != Infinity) { args.push("limit=" + this.limit); } if (this._batch) { args.push("batch=" + encodeURIComponent(this._batch)); } if (this._commit) { args.push("commit=true"); } if (this._offset) { args.push("offset=" + encodeURIComponent(this._offset)); } this.uri = this.uri .mutate() .setQuery(args.length ? "?" + args.join("&") : "") .finalize(); }, // get full items get full() { return this._full; }, set full(value) { this._full = value; this._rebuildURL(); }, // Apply the action to a certain set of ids get ids() { return this._ids; }, set ids(value) { this._ids = value; this._rebuildURL(); }, // Limit how many records to get get limit() { return this._limit; }, set limit(value) { this._limit = value; this._rebuildURL(); }, // get only items modified before some date get older() { return this._older; }, set older(value) { this._older = value; this._rebuildURL(); }, // get only items modified since some date get newer() { return this._newer; }, set newer(value) { this._newer = value; this._rebuildURL(); }, // get items sorted by some criteria. valid values: // oldest (oldest first) // newest (newest first) // index get sort() { return this._sort; }, set sort(value) { if (value && value != "oldest" && value != "newest" && value != "index") { throw new TypeError( `Illegal value for sort: "${value}" (should be "oldest", "newest", or "index").` ); } this._sort = value; this._rebuildURL(); }, get offset() { return this._offset; }, set offset(value) { this._offset = value; this._rebuildURL(); }, // Set information about the batch for this request. get batch() { return this._batch; }, set batch(value) { this._batch = value; this._rebuildURL(); }, get commit() { return this._commit; }, set commit(value) { this._commit = value && true; this._rebuildURL(); }, // Similar to get(), but will page through the items `batchSize` at a time, // deferring calling the record handler until we've gotten them all. // // Returns the last response processed, and doesn't run the record handler // on any items if a non-success status is received while downloading the // records (or if a network error occurs). async getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) { let totalLimit = Number(this.limit) || Infinity; if (batchSize <= 0 || batchSize >= totalLimit) { throw new Error("Invalid batch size"); } if (!this.full) { throw new Error("getBatched is unimplemented for guid-only GETs"); } // _onComplete and _onProgress are reset after each `get` by Resource. let { _onComplete, _onProgress } = this; let recordBuffer = []; let resp; try { let lastModifiedTime; this.limit = batchSize; do { this._onProgress = _onProgress; this._onComplete = _onComplete; if (batchSize + recordBuffer.length > totalLimit) { this.limit = totalLimit - recordBuffer.length; } this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset, }); // Actually perform the request resp = await this.get(); if (!resp.success) { recordBuffer = []; break; } for (let json of resp.obj) { let record = new this._recordObj(); record.deserialize(json); recordBuffer.push(record); } // Initialize last modified, or check that something broken isn't happening. let lastModified = resp.headers["x-last-modified"]; if (!lastModifiedTime) { lastModifiedTime = lastModified; this.setHeader("X-If-Unmodified-Since", lastModified); } else if (lastModified != lastModifiedTime) { // Should be impossible -- We'd get a 412 in this case. throw new Error( "X-Last-Modified changed in the middle of a download batch! " + `${lastModified} => ${lastModifiedTime}` ); } // If this is missing, we're finished. this.offset = resp.headers["x-weave-next-offset"]; } while (this.offset && totalLimit > recordBuffer.length); } finally { // Ensure we undo any temporary state so that subsequent calls to get() // or getBatched() work properly. We do this before calling the record // handler so that we can more convincingly pretend to be a normal get() // call. Note: we're resetting these to the values they had before this // function was called. this._limit = totalLimit; this._offset = null; delete this._headers["x-if-unmodified-since"]; this._rebuildURL(); } return { response: resp, records: recordBuffer }; }, // This object only supports posting via the postQueue object. post() { throw new Error( "Don't directly post to a collection - use newPostQueue instead" ); }, newPostQueue(log, timestamp, postCallback) { let poster = (data, headers, batch, commit) => { this.batch = batch; this.commit = commit; for (let [header, value] of headers) { this.setHeader(header, value); } return Resource.prototype.post.call(this, data); }; return new PostQueue( poster, timestamp, this._service.serverConfiguration || {}, log, postCallback ); }, }; Object.setPrototypeOf(Collection.prototype, Resource.prototype); // These are limits for requests provided by the server at the // info/configuration endpoint -- server documentation is available here: // http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html#api-instructions // // All are optional, however we synthesize (non-infinite) default values for the // "max_request_bytes" and "max_record_payload_bytes" options. For the others, // we ignore them (we treat the limit is infinite) if they're missing. // // These are also the only ones that all servers (even batching-disabled // servers) should support, at least once this sync-serverstorage patch is // everywhere https://github.com/mozilla-services/server-syncstorage/pull/74 // // Batching enabled servers also limit the amount of payload data and number // of and records we can send in a single post as well as in the whole batch. // Note that the byte limits for these there are just with respect to the // *payload* data, e.g. the data appearing in the payload property (a // string) of the object. // // Note that in practice, these limits should be sensible, but the code makes // no assumptions about this. If we hit any of the limits, we perform the // corresponding action (e.g. submit a request, possibly committing the // current batch). const DefaultPostQueueConfig = Object.freeze({ // Number of total bytes allowed in a request max_request_bytes: 260 * 1024, // Maximum number of bytes allowed in the "payload" property of a record. max_record_payload_bytes: 256 * 1024, // The limit for how many bytes worth of data appearing in "payload" // properties are allowed in a single post. max_post_bytes: Infinity, // The limit for the number of records allowed in a single post. max_post_records: Infinity, // The limit for how many bytes worth of data appearing in "payload" // properties are allowed in a batch. (Same as max_post_bytes, but for // batches). max_total_bytes: Infinity, // The limit for the number of records allowed in a single post. (Same // as max_post_records, but for batches). max_total_records: Infinity, }); // Manages a pair of (byte, count) limits for a PostQueue, such as // (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records). class LimitTracker { constructor(maxBytes, maxRecords) { this.maxBytes = maxBytes; this.maxRecords = maxRecords; this.curBytes = 0; this.curRecords = 0; } clear() { this.curBytes = 0; this.curRecords = 0; } canAddRecord(payloadSize) { // The record counts are inclusive, but depending on the version of the // server, the byte counts may or may not be inclusive (See // https://github.com/mozilla-services/server-syncstorage/issues/73). return ( this.curRecords + 1 <= this.maxRecords && this.curBytes + payloadSize < this.maxBytes ); } canNeverAdd(recordSize) { return recordSize >= this.maxBytes; } didAddRecord(recordSize) { if (!this.canAddRecord(recordSize)) { // This is a bug, caller is expected to call canAddRecord first. throw new Error( "LimitTracker.canAddRecord must be checked before adding record" ); } this.curRecords += 1; this.curBytes += recordSize; } } /* A helper to manage the posting of records while respecting the various size limits. This supports the concept of a server-side "batch". The general idea is: * We queue as many records as allowed in memory, then make a single POST. * This first POST (optionally) gives us a batch ID, which we use for all subsequent posts, until... * At some point we hit a batch-maximum, and jump through a few hoops to commit the current batch (ie, all previous POSTs) and start a new one. * Eventually commit the final batch. In most cases we expect there to be exactly 1 batch consisting of possibly multiple POSTs. */ export function PostQueue(poster, timestamp, serverConfig, log, postCallback) { // The "post" function we should use when it comes time to do the post. this.poster = poster; this.log = log; let config = Object.assign({}, DefaultPostQueueConfig, serverConfig); if (!serverConfig.max_request_bytes && serverConfig.max_post_bytes) { // Use max_post_bytes for max_request_bytes if it's missing. Only needed // until server-syncstorage/pull/74 is everywhere, and even then it's // unnecessary if the server limits are configured sanely (there's no // guarantee of -- at least before that is fully deployed) config.max_request_bytes = serverConfig.max_post_bytes; } this.log.trace("new PostQueue config (after defaults): ", config); // The callback we make with the response when we do get around to making the // post (which could be during any of the enqueue() calls or the final flush()) // This callback may be called multiple times and must not add new items to // the queue. // The second argument passed to this callback is a boolean value that is true // if we're in the middle of a batch, and false if either the batch is // complete, or it's a post to a server that does not understand batching. this.postCallback = postCallback; // Tracks the count and combined payload size for the records we've queued // so far but are yet to POST. this.postLimits = new LimitTracker( config.max_post_bytes, config.max_post_records ); // As above, but for the batch size. this.batchLimits = new LimitTracker( config.max_total_bytes, config.max_total_records ); // Limit for the size of `this.queued` before we do a post. this.maxRequestBytes = config.max_request_bytes; // Limit for the size of incoming record payloads. this.maxPayloadBytes = config.max_record_payload_bytes; // The string where we are capturing the stringified version of the records // queued so far. It will always be invalid JSON as it is always missing the // closing bracket. It's also used to track whether or not we've gone past // maxRequestBytes. this.queued = ""; // The ID of our current batch. Can be undefined (meaning we are yet to make // the first post of a patch, so don't know if we have a batch), null (meaning // we've made the first post but the server response indicated no batching // semantics), otherwise we have made the first post and it holds the batch ID // returned from the server. this.batchID = undefined; // Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET. this.lastModified = timestamp; } PostQueue.prototype = { async enqueue(record) { // We want to ensure the record has a .toJSON() method defined - even // though JSON.stringify() would implicitly call it, the stringify might // still work even if it isn't defined, which isn't what we want. let jsonRepr = record.toJSON(); if (!jsonRepr) { throw new Error( "You must only call this with objects that explicitly support JSON" ); } let bytes = JSON.stringify(jsonRepr); // We use the payload size for the LimitTrackers, since that's what the // byte limits other than max_request_bytes refer to. let payloadLength = jsonRepr.payload.length; // The `+ 2` is to account for the 2-byte (maximum) overhead (one byte for // the leading comma or "[", which all records will have, and the other for // the final trailing "]", only present for the last record). let encodedLength = bytes.length + 2; // Check first if there's some limit that indicates we cannot ever enqueue // this record. let isTooBig = this.postLimits.canNeverAdd(payloadLength) || this.batchLimits.canNeverAdd(payloadLength) || encodedLength >= this.maxRequestBytes || payloadLength >= this.maxPayloadBytes; if (isTooBig) { return { enqueued: false, error: new Error("Single record too large to submit to server"), }; } let canPostRecord = this.postLimits.canAddRecord(payloadLength); let canBatchRecord = this.batchLimits.canAddRecord(payloadLength); let canSendRecord = this.queued.length + encodedLength < this.maxRequestBytes; if (!canPostRecord || !canBatchRecord || !canSendRecord) { this.log.trace("PostQueue flushing: ", { canPostRecord, canSendRecord, canBatchRecord, }); // We need to write the queue out before handling this one, but we only // commit the batch (and thus start a new one) if the record couldn't fit // inside the batch. await this.flush(!canBatchRecord); } this.postLimits.didAddRecord(payloadLength); this.batchLimits.didAddRecord(payloadLength); // Either a ',' or a '[' depending on whether this is the first record. this.queued += this.queued.length ? "," : "["; this.queued += bytes; return { enqueued: true }; }, async flush(finalBatchPost) { if (!this.queued) { // nothing queued - we can't be in a batch, and something has gone very // bad if we think we are. if (this.batchID) { throw new Error( `Flush called when no queued records but we are in a batch ${this.batchID}` ); } return; } // the batch query-param and headers we'll send. let batch; let headers = []; if (this.batchID === undefined) { // First commit in a (possible) batch. batch = "true"; } else if (this.batchID) { // We have an existing batch. batch = this.batchID; } else { // Not the first post and we know we have no batch semantics. batch = null; } headers.push(["x-if-unmodified-since", this.lastModified]); let numQueued = this.postLimits.curRecords; this.log.info( `Posting ${numQueued} records of ${ this.queued.length + 1 } bytes with batch=${batch}` ); let queued = this.queued + "]"; if (finalBatchPost) { this.batchLimits.clear(); } this.postLimits.clear(); this.queued = ""; let response = await this.poster( queued, headers, batch, !!(finalBatchPost && this.batchID !== null) ); if (!response.success) { this.log.trace("Server error response during a batch", response); // not clear what we should do here - we expect the consumer of this to // abort by throwing in the postCallback below. await this.postCallback(this, response, !finalBatchPost); return; } if (finalBatchPost) { this.log.trace("Committed batch", this.batchID); this.batchID = undefined; // we are now in "first post for the batch" state. this.lastModified = response.headers["x-last-modified"]; await this.postCallback(this, response, false); return; } if (response.status != 202) { if (this.batchID) { throw new Error( "Server responded non-202 success code while a batch was in progress" ); } this.batchID = null; // no batch semantics are in place. this.lastModified = response.headers["x-last-modified"]; await this.postCallback(this, response, false); return; } // this response is saying the server has batch semantics - we should // always have a batch ID in the response. let responseBatchID = response.obj.batch; this.log.trace("Server responsed 202 with batch", responseBatchID); if (!responseBatchID) { this.log.error( "Invalid server response: 202 without a batch ID", response ); throw new Error("Invalid server response: 202 without a batch ID"); } if (this.batchID === undefined) { this.batchID = responseBatchID; if (!this.lastModified) { this.lastModified = response.headers["x-last-modified"]; if (!this.lastModified) { throw new Error("Batch response without x-last-modified"); } } } if (this.batchID != responseBatchID) { throw new Error( `Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}` ); } await this.postCallback(this, response, true); }, };