diff options
Diffstat (limited to '')
-rw-r--r-- | services/sync/modules/telemetry.js | 1111 |
1 files changed, 1111 insertions, 0 deletions
diff --git a/services/sync/modules/telemetry.js b/services/sync/modules/telemetry.js new file mode 100644 index 0000000000..12150ade91 --- /dev/null +++ b/services/sync/modules/telemetry.js @@ -0,0 +1,1111 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +var EXPORTED_SYMBOLS = ["SyncTelemetry"]; + +// Support for Sync-and-FxA-related telemetry, which is submitted in a special-purpose +// telemetry ping called the "sync ping", documented here: +// +// ../../../toolkit/components/telemetry/docs/data/sync-ping.rst +// +// The sync ping contains identifiers that are linked to the user's Firefox Account +// and are separate from the main telemetry client_id, so this file is also responsible +// for ensuring that we can delete those pings upon user request, by plumbing its +// identifiers into the "deletion-request" ping. + +const { XPCOMUtils } = ChromeUtils.import( + "resource://gre/modules/XPCOMUtils.jsm" +); + +XPCOMUtils.defineLazyModuleGetters(this, { + Async: "resource://services-common/async.js", + AuthenticationError: "resource://services-sync/browserid_identity.js", + fxAccounts: "resource://gre/modules/FxAccounts.jsm", + FxAccounts: "resource://gre/modules/FxAccounts.jsm", + Log: "resource://gre/modules/Log.jsm", + ObjectUtils: "resource://gre/modules/ObjectUtils.jsm", + Observers: "resource://services-common/observers.js", + OS: "resource://gre/modules/osfile.jsm", + Resource: "resource://services-sync/resource.js", + Services: "resource://gre/modules/Services.jsm", + Status: "resource://services-sync/status.js", + Svc: "resource://services-sync/util.js", + TelemetryController: "resource://gre/modules/TelemetryController.jsm", + TelemetryEnvironment: "resource://gre/modules/TelemetryEnvironment.jsm", + TelemetryUtils: "resource://gre/modules/TelemetryUtils.jsm", + Weave: "resource://services-sync/main.js", +}); + +let constants = {}; +ChromeUtils.import("resource://services-sync/constants.js", constants); + +XPCOMUtils.defineLazyServiceGetter( + this, + "Telemetry", + "@mozilla.org/base/telemetry;1", + "nsITelemetry" +); + +XPCOMUtils.defineLazyGetter( + this, + "WeaveService", + () => Cc["@mozilla.org/weave/service;1"].getService().wrappedJSObject +); +const log = Log.repository.getLogger("Sync.Telemetry"); + +const TOPICS = [ + // For tracking change to account/device identifiers. + "fxaccounts:new_device_id", + "fxaccounts:onlogout", + "weave:service:ready", + "weave:service:login:change", + + // For whole-of-sync metrics. + "weave:service:sync:start", + "weave:service:sync:finish", + "weave:service:sync:error", + + // For individual engine metrics. + "weave:engine:sync:start", + "weave:engine:sync:finish", + "weave:engine:sync:error", + "weave:engine:sync:applied", + "weave:engine:sync:step", + "weave:engine:sync:uploaded", + "weave:engine:validate:finish", + "weave:engine:validate:error", + + // For ad-hoc telemetry events. + "weave:telemetry:event", + "weave:telemetry:histogram", + "fxa:telemetry:event", + + "weave:telemetry:migration", +]; + +const PING_FORMAT_VERSION = 1; + +const EMPTY_UID = "0".repeat(32); + +// The set of engines we record telemetry for - any other engines are ignored. +const ENGINES = new Set([ + "addons", + "bookmarks", + "clients", + "forms", + "history", + "passwords", + "prefs", + "tabs", + "extension-storage", + "addresses", + "creditcards", +]); + +// A regex we can use to replace the profile dir in error messages. We use a +// regexp so we can simply replace all case-insensitive occurences. +// This escaping function is from: +// https://developer.mozilla.org/en/docs/Web/JavaScript/Guide/Regular_Expressions +const reProfileDir = new RegExp( + OS.Constants.Path.profileDir.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"), + "gi" +); + +function tryGetMonotonicTimestamp() { + try { + return Telemetry.msSinceProcessStart(); + } catch (e) { + log.warn("Unable to get a monotonic timestamp!"); + return -1; + } +} + +function timeDeltaFrom(monotonicStartTime) { + let now = tryGetMonotonicTimestamp(); + if (monotonicStartTime !== -1 && now !== -1) { + return Math.round(now - monotonicStartTime); + } + return -1; +} + +// Converts extra integer fields to strings, rounds floats to three +// decimal places (nanosecond precision for timings), and removes profile +// directory paths and URLs from potential error messages. +function normalizeExtraTelemetryFields(extra) { + let result = {}; + for (let key in extra) { + let value = extra[key]; + let type = typeof value; + if (type == "string") { + result[key] = cleanErrorMessage(value); + } else if (type == "number") { + result[key] = Number.isInteger(value) + ? value.toString(10) + : value.toFixed(3); + } else if (type != "undefined") { + throw new TypeError( + `Invalid type ${type} for extra telemetry field ${key}` + ); + } + } + return ObjectUtils.isEmpty(result) ? undefined : result; +} + +// This function validates the payload of a telemetry "event" - this can be +// removed once there are APIs available for the telemetry modules to collect +// these events (bug 1329530) - but for now we simulate that planned API as +// best we can. +function validateTelemetryEvent(eventDetails) { + let { object, method, value, extra } = eventDetails; + // Do do basic validation of the params - everything except "extra" must + // be a string. method and object are required. + if ( + typeof method != "string" || + typeof object != "string" || + (value && typeof value != "string") || + (extra && typeof extra != "object") + ) { + log.warn("Invalid event parameters - wrong types", eventDetails); + return false; + } + // length checks. + if ( + method.length > 20 || + object.length > 20 || + (value && value.length > 80) + ) { + log.warn("Invalid event parameters - wrong lengths", eventDetails); + return false; + } + + // extra can be falsey, or an object with string names and values. + if (extra) { + if (Object.keys(extra).length > 10) { + log.warn("Invalid event parameters - too many extra keys", eventDetails); + return false; + } + for (let [ename, evalue] of Object.entries(extra)) { + if ( + typeof ename != "string" || + ename.length > 15 || + typeof evalue != "string" || + evalue.length > 85 + ) { + log.warn( + `Invalid event parameters: extra item "${ename} is invalid`, + eventDetails + ); + return false; + } + } + } + return true; +} + +class EngineRecord { + constructor(name) { + // startTime is in ms from process start, but is monotonic (unlike Date.now()) + // so we need to keep both it and when. + this.startTime = tryGetMonotonicTimestamp(); + this.name = name; + + // This allows cases like bookmarks-buffered to have a separate name from + // the bookmarks engine. + let engineImpl = Weave.Service.engineManager.get(name); + if (engineImpl && engineImpl.overrideTelemetryName) { + this.overrideTelemetryName = engineImpl.overrideTelemetryName; + } + } + + toJSON() { + let result = { name: this.overrideTelemetryName || this.name }; + let properties = [ + "took", + "status", + "failureReason", + "incoming", + "outgoing", + "validation", + "steps", + ]; + for (let property of properties) { + result[property] = this[property]; + } + return result; + } + + finished(error) { + let took = timeDeltaFrom(this.startTime); + if (took > 0) { + this.took = took; + } + if (error) { + this.failureReason = SyncTelemetry.transformError(error); + } + } + + recordApplied(counts) { + if (this.incoming) { + log.error( + `Incoming records applied multiple times for engine ${this.name}!` + ); + return; + } + if (this.name === "clients" && !counts.failed) { + // ignore successful application of client records + // since otherwise they show up every time and are meaningless. + return; + } + + let incomingData = {}; + let properties = ["applied", "failed", "newFailed", "reconciled"]; + // Only record non-zero properties and only record incoming at all if + // there's at least one property we care about. + for (let property of properties) { + if (counts[property]) { + incomingData[property] = counts[property]; + this.incoming = incomingData; + } + } + } + + recordStep(stepResult) { + let step = { + name: stepResult.name, + }; + if (stepResult.took > 0) { + step.took = Math.round(stepResult.took); + } + if (stepResult.counts) { + let counts = stepResult.counts.filter(({ count }) => count > 0); + if (counts.length) { + step.counts = counts; + } + } + if (this.steps) { + this.steps.push(step); + } else { + this.steps = [step]; + } + } + + recordValidation(validationResult) { + if (this.validation) { + log.error(`Multiple validations occurred for engine ${this.name}!`); + return; + } + let { problems, version, took, checked } = validationResult; + let validation = { + version: version || 0, + checked: checked || 0, + }; + if (took > 0) { + validation.took = Math.round(took); + } + let summarized = problems.filter(({ count }) => count > 0); + if (summarized.length) { + validation.problems = summarized; + } + this.validation = validation; + } + + recordValidationError(e) { + if (this.validation) { + log.error(`Multiple validations occurred for engine ${this.name}!`); + return; + } + + this.validation = { + failureReason: SyncTelemetry.transformError(e), + }; + } + + recordUploaded(counts) { + if (counts.sent || counts.failed) { + if (!this.outgoing) { + this.outgoing = []; + } + this.outgoing.push({ + sent: counts.sent || undefined, + failed: counts.failed || undefined, + }); + } + } +} + +// The record of a single "sync" - typically many of these are submitted in +// a single ping (ie, as a 'syncs' array) +class SyncRecord { + constructor(allowedEngines, why) { + this.allowedEngines = allowedEngines; + // Our failure reason. This property only exists in the generated ping if an + // error actually occurred. + this.failureReason = undefined; + this.syncNodeType = null; + this.when = Date.now(); + this.startTime = tryGetMonotonicTimestamp(); + this.took = 0; // will be set later. + this.why = why; + + // All engines that have finished (ie, does not include the "current" one) + // We omit this from the ping if it's empty. + this.engines = []; + // The engine that has started but not yet stopped. + this.currentEngine = null; + } + + toJSON() { + let result = { + when: this.when, + took: this.took, + failureReason: this.failureReason, + status: this.status, + }; + if (this.why) { + result.why = this.why; + } + let engines = []; + for (let engine of this.engines) { + engines.push(engine.toJSON()); + } + if (engines.length > 0) { + result.engines = engines; + } + return result; + } + + finished(error) { + this.took = timeDeltaFrom(this.startTime); + if (this.currentEngine != null) { + log.error( + "Finished called for the sync before the current engine finished" + ); + this.currentEngine.finished(null); + this.onEngineStop(this.currentEngine.name); + } + if (error) { + this.failureReason = SyncTelemetry.transformError(error); + } + + this.syncNodeType = Weave.Service.identity.telemetryNodeType; + + // Check for engine statuses. -- We do this now, and not in engine.finished + // to make sure any statuses that get set "late" are recorded + for (let engine of this.engines) { + let status = Status.engines[engine.name]; + if (status && status !== constants.ENGINE_SUCCEEDED) { + engine.status = status; + } + } + + let statusObject = {}; + + let serviceStatus = Status.service; + if (serviceStatus && serviceStatus !== constants.STATUS_OK) { + statusObject.service = serviceStatus; + this.status = statusObject; + } + let syncStatus = Status.sync; + if (syncStatus && syncStatus !== constants.SYNC_SUCCEEDED) { + statusObject.sync = syncStatus; + this.status = statusObject; + } + } + + onEngineStart(engineName) { + if (this._shouldIgnoreEngine(engineName, false)) { + return; + } + + if (this.currentEngine) { + log.error( + `Being told that engine ${engineName} has started, but current engine ${this.currentEngine.name} hasn't stopped` + ); + // Just discard the current engine rather than making up data for it. + } + this.currentEngine = new EngineRecord(engineName); + } + + onEngineStop(engineName, error) { + // We only care if it's the current engine if we have a current engine. + if (this._shouldIgnoreEngine(engineName, !!this.currentEngine)) { + return; + } + if (!this.currentEngine) { + // It's possible for us to get an error before the start message of an engine + // (somehow), in which case we still want to record that error. + if (!error) { + return; + } + log.error( + `Error triggered on ${engineName} when no current engine exists: ${error}` + ); + this.currentEngine = new EngineRecord(engineName); + } + this.currentEngine.finished(error); + this.engines.push(this.currentEngine); + this.currentEngine = null; + } + + onEngineApplied(engineName, counts) { + if (this._shouldIgnoreEngine(engineName)) { + return; + } + this.currentEngine.recordApplied(counts); + } + + onEngineStep(engineName, step) { + if (this._shouldIgnoreEngine(engineName)) { + return; + } + this.currentEngine.recordStep(step); + } + + onEngineValidated(engineName, validationData) { + if (this._shouldIgnoreEngine(engineName, false)) { + return; + } + let engine = this.engines.find(e => e.name === engineName); + if ( + !engine && + this.currentEngine && + engineName === this.currentEngine.name + ) { + engine = this.currentEngine; + } + if (engine) { + engine.recordValidation(validationData); + } else { + log.warn( + `Validation event triggered for engine ${engineName}, which hasn't been synced!` + ); + } + } + + onEngineValidateError(engineName, error) { + if (this._shouldIgnoreEngine(engineName, false)) { + return; + } + let engine = this.engines.find(e => e.name === engineName); + if ( + !engine && + this.currentEngine && + engineName === this.currentEngine.name + ) { + engine = this.currentEngine; + } + if (engine) { + engine.recordValidationError(error); + } else { + log.warn( + `Validation failure event triggered for engine ${engineName}, which hasn't been synced!` + ); + } + } + + onEngineUploaded(engineName, counts) { + if (this._shouldIgnoreEngine(engineName)) { + return; + } + this.currentEngine.recordUploaded(counts); + } + + _shouldIgnoreEngine(engineName, shouldBeCurrent = true) { + if (!this.allowedEngines.has(engineName)) { + log.info( + `Notification for engine ${engineName}, but we aren't recording telemetry for it` + ); + return true; + } + if (shouldBeCurrent) { + if (!this.currentEngine || engineName != this.currentEngine.name) { + log.error(`Notification for engine ${engineName} but it isn't current`); + return true; + } + } + return false; + } +} + +function cleanErrorMessage(error) { + // There's a chance the profiledir is in the error string which is PII we + // want to avoid including in the ping. + error = error.replace(reProfileDir, "[profileDir]"); + // MSG_INVALID_URL from /dom/bindings/Errors.msg -- no way to access this + // directly from JS. + if (error.endsWith("is not a valid URL.")) { + error = "<URL> is not a valid URL."; + } + // Try to filter things that look somewhat like a URL (in that they contain a + // colon in the middle of non-whitespace), in case anything else is including + // these in error messages. Note that JSON.stringified stuff comes through + // here, so we explicitly ignore double-quotes as well. + error = error.replace(/[^\s"]+:[^\s"]+/g, "<URL>"); + return error; +} + +// The entire "sync ping" - it includes all the syncs, events etc recorded in +// the ping. +class SyncTelemetryImpl { + constructor(allowedEngines) { + log.manageLevelFromPref("services.sync.log.logger.telemetry"); + // This is accessible so we can enable custom engines during tests. + this.allowedEngines = allowedEngines; + this.current = null; + this.setupObservers(); + + this.payloads = []; + this.discarded = 0; + this.events = []; + this.histograms = {}; + this.migrations = []; + this.maxEventsCount = Svc.Prefs.get("telemetry.maxEventsCount", 1000); + this.maxPayloadCount = Svc.Prefs.get("telemetry.maxPayloadCount"); + this.submissionInterval = + Svc.Prefs.get("telemetry.submissionInterval") * 1000; + this.lastSubmissionTime = Telemetry.msSinceProcessStart(); + this.lastUID = EMPTY_UID; + this.lastSyncNodeType = null; + this.currentSyncNodeType = null; + // Note that the sessionStartDate is somewhat arbitrary - the telemetry + // modules themselves just use `new Date()`. This means that our startDate + // isn't going to be the same as the sessionStartDate in the main pings, + // but that's OK for now - if it's a problem we'd need to change the + // telemetry modules to expose what it thinks the sessionStartDate is. + let sessionStartDate = new Date(); + this.sessionStartDate = TelemetryUtils.toLocalTimeISOString( + TelemetryUtils.truncateToHours(sessionStartDate) + ); + TelemetryController.registerSyncPingShutdown(() => this.shutdown()); + } + + sanitizeFxaDeviceId(deviceId) { + return fxAccounts.telemetry.sanitizeDeviceId(deviceId); + } + + prepareFxaDevices(devices) { + // For non-sync users, the data per device is limited -- just an id and a + // type (and not even the id yet). For sync users, if we can correctly map + // the fxaDevice to a sync device, then we can get os and version info, + // which would be quite unfortunate to lose. + let extraInfoMap = new Map(); + if (this.syncIsEnabled()) { + for (let client of this.getClientsEngineRecords()) { + if (client.fxaDeviceId) { + extraInfoMap.set(client.fxaDeviceId, { + os: client.os, + version: client.version, + syncID: this.sanitizeFxaDeviceId(client.id), + }); + } + } + } + // Finally, sanitize and convert to the proper format. + return devices.map(d => { + let { os, version, syncID } = extraInfoMap.get(d.id) || { + os: undefined, + version: undefined, + syncID: undefined, + }; + return { + id: this.sanitizeFxaDeviceId(d.id) || EMPTY_UID, + type: d.type, + os, + version, + syncID, + }; + }); + } + + syncIsEnabled() { + return WeaveService.enabled && WeaveService.ready; + } + + // Separate for testing. + getClientsEngineRecords() { + if (!this.syncIsEnabled()) { + throw new Error("Bug: syncIsEnabled() must be true, check it first"); + } + return Weave.Service.clientsEngine.remoteClients; + } + + updateFxaDevices(devices) { + if (!devices) { + return {}; + } + let me = devices.find(d => d.isCurrentDevice); + let id = me ? this.sanitizeFxaDeviceId(me.id) : undefined; + let cleanDevices = this.prepareFxaDevices(devices); + return { deviceID: id, devices: cleanDevices }; + } + + getFxaDevices() { + return fxAccounts.device.recentDeviceList; + } + + getPingJSON(reason) { + let { devices, deviceID } = this.updateFxaDevices(this.getFxaDevices()); + return { + os: TelemetryEnvironment.currentEnvironment.system.os, + why: reason, + devices, + discarded: this.discarded || undefined, + version: PING_FORMAT_VERSION, + syncs: this.payloads.slice(), + uid: this.lastUID, + syncNodeType: this.lastSyncNodeType || undefined, + deviceID, + sessionStartDate: this.sessionStartDate, + events: this.events.length == 0 ? undefined : this.events, + migrations: this.migrations.length == 0 ? undefined : this.migrations, + histograms: + Object.keys(this.histograms).length == 0 ? undefined : this.histograms, + }; + } + + _addMigrationRecord(type, info) { + log.debug("Saw telemetry migration info", type, info); + // Updates to this need to be documented in `sync-ping.rst` + switch (type) { + case "webext-storage": + this.migrations.push({ + type: "webext-storage", + entries: +info.entries, + entriesSuccessful: +info.entries_successful, + extensions: +info.extensions, + extensionsSuccessful: +info.extensions_successful, + openFailure: !!info.open_failure, + }); + break; + default: + throw new Error("Bug: Unknown migration record type " + type); + } + } + + finish(reason) { + // Note that we might be in the middle of a sync right now, and so we don't + // want to touch this.current. + let result = this.getPingJSON(reason); + this.payloads = []; + this.discarded = 0; + this.events = []; + this.migrations = []; + this.histograms = {}; + this.submit(result); + } + + setupObservers() { + for (let topic of TOPICS) { + Observers.add(topic, this, this); + } + } + + shutdown() { + this.finish("shutdown"); + for (let topic of TOPICS) { + Observers.remove(topic, this, this); + } + } + + submit(record) { + if (!this.isProductionSyncUser()) { + return false; + } + // We still call submit() with possibly illegal payloads so that tests can + // know that the ping was built. We don't end up submitting them, however. + let numEvents = record.events ? record.events.length : 0; + let numMigrations = record.migrations ? record.migrations.length : 0; + if (record.syncs.length || numEvents || numMigrations) { + log.trace( + `submitting ${record.syncs.length} sync record(s) and ` + + `${numEvents} event(s) to telemetry` + ); + TelemetryController.submitExternalPing("sync", record, { + usePingSender: true, + }).catch(err => { + log.error("failed to submit ping", err); + }); + return true; + } + return false; + } + + isProductionSyncUser() { + // If FxA isn't production then we treat sync as not being production. + // Further, there's the deprecated "services.sync.tokenServerURI" pref we + // need to consider - fxa doesn't consider that as if that's the only + // pref set, they *are* running a production fxa, just not production sync. + if ( + !FxAccounts.config.isProductionConfig() || + Services.prefs.prefHasUserValue("services.sync.tokenServerURI") + ) { + log.trace(`Not sending telemetry ping for self-hosted Sync user`); + return false; + } + return true; + } + + onSyncStarted(data) { + const why = data && JSON.parse(data).why; + if (this.current) { + log.warn( + "Observed weave:service:sync:start, but we're already recording a sync!" + ); + // Just discard the old record, consistent with our handling of engines, above. + this.current = null; + } + this.current = new SyncRecord(this.allowedEngines, why); + } + + // We need to ensure that the telemetry `deletion-request` ping always contains the user's + // current sync device ID, because if the user opts out of telemetry then the deletion ping + // will be immediately triggered for sending, and we won't have a chance to fill it in later. + // This keeps the `deletion-ping` up-to-date when the user's account state changes. + onAccountInitOrChange() { + // We don't submit sync pings for self-hosters, so don't need to collect their device ids either. + if (!this.isProductionSyncUser()) { + return; + } + // Awkwardly async, but no need to await. If the user's account state changes while + // this promise is in flight, it will reject and we won't record any data in the ping. + // (And a new notification will trigger us to try again with the new state). + fxAccounts.device + .getLocalId() + .then(deviceId => { + let sanitizedDeviceId = fxAccounts.telemetry.sanitizeDeviceId(deviceId); + // In the past we did not persist the FxA metrics identifiers to disk, + // so this might be missing until we can fetch it from the server for the + // first time. There will be a fresh notification tirggered when it's available. + if (sanitizedDeviceId) { + // Sanitized device ids are 64 characters long, but telemetry limits scalar strings to 50. + // The first 32 chars are sufficient to uniquely identify the device, so just send those. + // It's hard to change the sync ping itself to only send 32 chars, to b/w compat reasons. + sanitizedDeviceId = sanitizedDeviceId.substr(0, 32); + Services.telemetry.scalarSet( + "deletion.request.sync_device_id", + sanitizedDeviceId + ); + } + }) + .catch(err => { + log.warn( + `Failed to set sync identifiers in the deletion-request ping: ${err}` + ); + }); + } + + // This keeps the `deletion-request` ping up-to-date when the user signs out, + // clearing the now-nonexistent sync device id. + onAccountLogout() { + Services.telemetry.scalarSet("deletion.request.sync_device_id", ""); + } + + _checkCurrent(topic) { + if (!this.current) { + log.warn( + `Observed notification ${topic} but no current sync is being recorded.` + ); + return false; + } + return true; + } + + _shouldSubmitForDataChange() { + let newID = fxAccounts.telemetry.getSanitizedUID() || EMPTY_UID; + let oldID = this.lastUID; + if ( + newID != EMPTY_UID && + oldID != EMPTY_UID && + // Both are "real" uids, so we care if they've changed. + newID != oldID + ) { + log.trace( + `shouldSubmitForDataChange - uid from '${oldID}' -> '${newID}'` + ); + return true; + } + // We've gone from knowing one of the ids to not knowing it (which we + // ignore) or we've gone from not knowing it to knowing it (which is fine), + // Now check the node type because a change there also means we should + // submit. + if ( + this.lastSyncNodeType && + this.currentSyncNodeType != this.lastSyncNodeType + ) { + log.trace( + `shouldSubmitForDataChange - nodeType from '${this.lastSyncNodeType}' -> '${this.currentSyncNodeType}'` + ); + return true; + } + log.trace("shouldSubmitForDataChange - no need to submit"); + return false; + } + + maybeSubmitForDataChange() { + if (this._shouldSubmitForDataChange()) { + log.info( + "Early submission of sync telemetry due to changed IDs/NodeType" + ); + this.finish("idchange"); // this actually submits. + this.lastSubmissionTime = Telemetry.msSinceProcessStart(); + } + + // Only update the last UIDs if we actually know them. + let current_uid = fxAccounts.telemetry.getSanitizedUID(); + if (current_uid) { + this.lastUID = current_uid; + } + if (this.currentSyncNodeType) { + this.lastSyncNodeType = this.currentSyncNodeType; + } + } + + maybeSubmitForInterval() { + // We want to submit the ping every `this.submissionInterval` but only when + // there's no current sync in progress, otherwise we may end up submitting + // the sync and the events caused by it in different pings. + if ( + this.current == null && + Telemetry.msSinceProcessStart() - this.lastSubmissionTime > + this.submissionInterval + ) { + this.finish("schedule"); + this.lastSubmissionTime = Telemetry.msSinceProcessStart(); + } + } + + onSyncFinished(error) { + if (!this.current) { + log.warn("onSyncFinished but we aren't recording"); + return; + } + this.current.finished(error); + this.currentSyncNodeType = this.current.syncNodeType; + // We check for "data change" before appending the current sync to payloads, + // as it is the current sync which has the data with the new data, and thus + // must go in the *next* submission. + this.maybeSubmitForDataChange(); + if (this.payloads.length < this.maxPayloadCount) { + this.payloads.push(this.current.toJSON()); + } else { + ++this.discarded; + } + this.current = null; + // If we are submitting due to timing, it's desirable that the most recent + // sync is included, so we check after appending `this.current`. + this.maybeSubmitForInterval(); + } + + _addHistogram(hist) { + let histogram = Telemetry.getHistogramById(hist); + let s = histogram.snapshot(); + this.histograms[hist] = s; + } + + _recordEvent(eventDetails) { + this.maybeSubmitForDataChange(); + + if (this.events.length >= this.maxEventsCount) { + log.warn("discarding event - already queued our maximum", eventDetails); + return; + } + + let { object, method, value, extra } = eventDetails; + if (extra) { + extra = normalizeExtraTelemetryFields(extra); + eventDetails = { object, method, value, extra }; + } + + if (!validateTelemetryEvent(eventDetails)) { + // we've already logged what the problem is... + return; + } + log.debug("recording event", eventDetails); + + if (extra && Resource.serverTime && !extra.serverTime) { + extra.serverTime = String(Resource.serverTime); + } + let category = "sync"; + let ts = Math.floor(tryGetMonotonicTimestamp()); + + // An event record is a simple array with at least 4 items. + let event = [ts, category, method, object]; + // It may have up to 6 elements if |extra| is defined + if (value) { + event.push(value); + if (extra) { + event.push(extra); + } + } else if (extra) { + event.push(null); // a null for the empty value. + event.push(extra); + } + this.events.push(event); + this.maybeSubmitForInterval(); + } + + observe(subject, topic, data) { + log.trace(`observed ${topic} ${data}`); + + switch (topic) { + case "weave:service:ready": + case "weave:service:login:change": + case "fxaccounts:new_device_id": + this.onAccountInitOrChange(); + break; + + case "fxaccounts:onlogout": + this.onAccountLogout(); + break; + + /* sync itself state changes */ + case "weave:service:sync:start": + this.onSyncStarted(data); + break; + + case "weave:service:sync:finish": + if (this._checkCurrent(topic)) { + this.onSyncFinished(null); + } + break; + + case "weave:service:sync:error": + // argument needs to be truthy (this should always be the case) + this.onSyncFinished(subject || "Unknown"); + break; + + /* engine sync state changes */ + case "weave:engine:sync:start": + if (this._checkCurrent(topic)) { + this.current.onEngineStart(data); + } + break; + case "weave:engine:sync:finish": + if (this._checkCurrent(topic)) { + this.current.onEngineStop(data, null); + } + break; + + case "weave:engine:sync:error": + if (this._checkCurrent(topic)) { + // argument needs to be truthy (this should always be the case) + this.current.onEngineStop(data, subject || "Unknown"); + } + break; + + /* engine counts */ + case "weave:engine:sync:applied": + if (this._checkCurrent(topic)) { + this.current.onEngineApplied(data, subject); + } + break; + + case "weave:engine:sync:step": + if (this._checkCurrent(topic)) { + this.current.onEngineStep(data, subject); + } + break; + + case "weave:engine:sync:uploaded": + if (this._checkCurrent(topic)) { + this.current.onEngineUploaded(data, subject); + } + break; + + case "weave:engine:validate:finish": + if (this._checkCurrent(topic)) { + this.current.onEngineValidated(data, subject); + } + break; + + case "weave:engine:validate:error": + if (this._checkCurrent(topic)) { + this.current.onEngineValidateError(data, subject || "Unknown"); + } + break; + + case "weave:telemetry:event": + case "fxa:telemetry:event": + this._recordEvent(subject); + break; + + case "weave:telemetry:histogram": + this._addHistogram(data); + break; + + case "weave:telemetry:migration": + this._addMigrationRecord(data, subject); + break; + + default: + log.warn(`unexpected observer topic ${topic}`); + break; + } + } + + // Transform an exception into a standard description. Exposed here for when + // this module isn't directly responsible for knowing the transform should + // happen (for example, when including an error in the |extra| field of + // event telemetry) + transformError(error) { + // Certain parts of sync will use this pattern as a way to communicate to + // processIncoming to abort the processing. However, there's no guarantee + // this can only happen then. + if (typeof error == "object" && error.code && error.cause) { + error = error.cause; + } + if (Async.isShutdownException(error)) { + return { name: "shutdownerror" }; + } + + if (typeof error === "string") { + if (error.startsWith("error.")) { + // This is hacky, but I can't imagine that it's not also accurate. + return { name: "othererror", error }; + } + error = cleanErrorMessage(error); + return { name: "unexpectederror", error }; + } + + if (error instanceof AuthenticationError) { + return { name: "autherror", from: error.source }; + } + + let httpCode = + error.status || (error.response && error.response.status) || error.code; + + if (httpCode) { + return { name: "httperror", code: httpCode }; + } + + if (error.failureCode) { + return { name: "othererror", error: error.failureCode }; + } + + if (error.result) { + return { name: "nserror", code: error.result }; + } + // It's probably an Error object, but it also could be some + // other object that may or may not override toString to do + // something useful. + let msg = String(error); + if (msg.startsWith("[object")) { + // Nothing useful in the default, check for a string "message" property. + if (typeof error.message == "string") { + msg = String(error.message); + } else { + // Hopefully it won't come to this... + msg = JSON.stringify(error); + } + } + return { + name: "unexpectederror", + error: cleanErrorMessage(msg), + }; + } +} + +var SyncTelemetry = new SyncTelemetryImpl(ENGINES); |