1
0
Fork 0
firefox/toolkit/components/telemetry/dap/DAPTelemetrySender.sys.mjs
Daniel Baumann 5e9a113729
Adding upstream version 140.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-25 09:37:52 +02:00

386 lines
12 KiB
JavaScript

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
import { HPKEConfigManager } from "resource://gre/modules/HPKEConfigManager.sys.mjs";
let lazy = {};
ChromeUtils.defineLazyGetter(lazy, "logConsole", function () {
return console.createInstance({
prefix: "DAPTelemetrySender",
maxLogLevelPref: "toolkit.telemetry.dap.logLevel",
});
});
ChromeUtils.defineESModuleGetters(lazy, {
AsyncShutdown: "resource://gre/modules/AsyncShutdown.sys.mjs",
NimbusFeatures: "resource://nimbus/ExperimentAPI.sys.mjs",
setTimeout: "resource://gre/modules/Timer.sys.mjs",
ObliviousHTTP: "resource://gre/modules/ObliviousHTTP.sys.mjs",
});
XPCOMUtils.defineLazyPreferenceGetter(
lazy,
"gTelemetryEnabled",
"datareporting.healthreport.uploadEnabled",
false
);
XPCOMUtils.defineLazyPreferenceGetter(
lazy,
"gDapEndpoint",
"toolkit.telemetry.dap.leader.url"
);
XPCOMUtils.defineLazyPreferenceGetter(
lazy,
"gLeaderHpke",
"toolkit.telemetry.dap.leader.hpke"
);
XPCOMUtils.defineLazyPreferenceGetter(
lazy,
"gHelperHpke",
"toolkit.telemetry.dap.helper.hpke"
);
/**
* The purpose of this singleton is to handle sending of DAP telemetry data.
* The current DAP draft standard is available here:
* https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap
*
* The specific purpose of this singleton is to make the necessary calls to fetch to do networking.
*/
export const DAPTelemetrySender = new (class {
/**
* @typedef { 'sum' | 'sumvec' | 'histogram' } VDAF
*/
/**
* Task configuration must match a configured task on the DAP server.
*
* @typedef {object} Task
* @property {string} id - The task ID in urlsafe_base64 encoding.
* @property {VDAF} vdaf - The VDAF used by the task.
* @property {number} [bits] - The bit-width of integers in sum/sumvec measurements.
* @property {number} [length] - The number of vector/histogram elements.
* @property {number} time_precision - The rounding granularity in seconds
* that is applied to timestamps attached
* to the report.
*/
async startup() {
if (
Services.startup.isInOrBeyondShutdownPhase(
Ci.nsIAppStartup.SHUTDOWN_PHASE_APPSHUTDOWNCONFIRMED
)
) {
lazy.logConsole.warn(
"DAPTelemetrySender startup not possible due to shutdown."
);
return;
}
// Note that this can block until the ExperimentAPI is available.
// This is fine as we depend on it. In case of a race with shutdown
// it will reject, making the below getVariable calls return null.
await lazy.NimbusFeatures.dapTelemetry.ready();
if (
lazy.NimbusFeatures.dapTelemetry.getVariable("enabled") &&
lazy.NimbusFeatures.dapTelemetry.getVariable("task1Enabled")
) {
let tasks = [];
lazy.logConsole.debug("Task 1 is enabled.");
let task1_id =
lazy.NimbusFeatures.dapTelemetry.getVariable("task1TaskId");
if (task1_id !== undefined && task1_id != "") {
let task = {
// this is testing task 1
id: task1_id,
vdaf: "sumvec",
bits: 8,
length: 20,
time_precision: 300,
};
tasks.push(task);
lazy.setTimeout(
() => this.timedSendTestReports(tasks),
this.timeout_value()
);
lazy.NimbusFeatures.dapTelemetry.onUpdate(async () => {
if (typeof this.counters !== "undefined") {
await this.sendTestReports(tasks, { reason: "nimbus-update" });
}
});
}
this._asyncShutdownBlocker = async () => {
lazy.logConsole.debug(`Sending on shutdown.`);
// Shorter timeout to prevent crashing due to blocking shutdown
await this.sendTestReports(tasks, {
timeout: 2_000,
reason: "shutdown",
});
};
lazy.AsyncShutdown.appShutdownConfirmed.addBlocker(
"DAPTelemetrySender: sending data",
this._asyncShutdownBlocker
);
}
}
async sendTestReports(tasks, options = {}) {
for (let task of tasks) {
let measurement;
if (task.vdaf == "sum") {
measurement = 3;
} else if (task.vdaf == "sumvec") {
measurement = new Array(20).fill(0);
let r = Math.floor(Math.random() * 10);
measurement[r] += 1;
measurement[19] += 1;
} else if (task.vdaf == "histogram") {
measurement = Math.floor(Math.random() * 15);
} else {
throw new Error(`Unknown VDAF ${task.vdaf}`);
}
await this.sendDAPMeasurement(task, measurement, options);
}
}
async timedSendTestReports(tasks) {
lazy.logConsole.debug("Sending on timer.");
await this.sendTestReports(tasks);
lazy.setTimeout(
() => this.timedSendTestReports(tasks),
this.timeout_value()
);
}
timeout_value() {
const MINUTE = 60 * 1000;
return MINUTE * (9 + Math.random() * 2); // 9 - 11 minutes
}
/**
* Internal testing function to verify the DAP aggregator keys match current
* values advertised by servers.
*/
async checkHpkeKeys() {
async function check_key(url, expected) {
let response = await fetch(url + "/hpke_config");
let body = await response.arrayBuffer();
let actual = ChromeUtils.base64URLEncode(body, { pad: false });
if (actual != expected) {
throw new Error(`HPKE for ${url} does not match`);
}
}
await Promise.allSettled([
await check_key(
Services.prefs.getStringPref("toolkit.telemetry.dap.leader.url"),
Services.prefs.getStringPref("toolkit.telemetry.dap.leader.hpke")
),
await check_key(
Services.prefs.getStringPref("toolkit.telemetry.dap.helper.url"),
Services.prefs.getStringPref("toolkit.telemetry.dap.helper.hpke")
),
]);
}
/**
* Creates a DAP report for a specific task from a measurement and sends it.
*
* @param {Task} task
* Definition of the task for which the measurement was taken.
* @param {number|Array<Number>} measurement
* The measured value for which a report is generated.
* @param {object} options
* @param {number} options.timeout
* The timeout for request in milliseconds. Defaults to 30s.
* @param {string} options.reason
* A string to indicate the reason for triggering a submission. This is
* currently ignored and not recorded.
* @param {string} options.ohttp_relay
* @param {Uint8Array} options.ohttp_hpke
* If an OHTTP relay is specified, the reports are uploaded over OHTTP.
*/
async sendDAPMeasurement(task, measurement, options = {}) {
try {
const controller = new AbortController();
lazy.setTimeout(() => controller.abort(), options.timeout ?? 30_000);
let keys = {
leader_hpke: HPKEConfigManager.decodeKey(lazy.gLeaderHpke),
helper_hpke: HPKEConfigManager.decodeKey(lazy.gHelperHpke),
};
let report = this.generateReport(task, measurement, keys);
await this.sendReport(
lazy.gDapEndpoint,
task.id,
report,
controller.signal,
options
);
} catch (e) {
if (e.name === "AbortError") {
lazy.logConsole.error("Aborted DAP report generation: ", e);
} else {
lazy.logConsole.error("DAP report generation failed: " + e);
}
throw e;
}
}
/*
* @typedef {object} AggregatorKeys
* @property {Uint8Array} leader_hpke - The leader's DAP HPKE key.
* @property {Uint8Array} helper_hpke - The helper's DAP HPKE key.
*/
/**
* Generates the encrypted DAP report.
*
* @param {Task} task
* Definition of the task for which the measurement was taken.
* @param {number|Array<number>} measurement
* The measured value for which a report is generated.
* @param {AggregatorKeys} keys
* The DAP encryption keys for each aggregator.
*
* @returns {ArrayBuffer} The generated binary report data.
*/
generateReport(task, measurement, keys) {
let task_id = new Uint8Array(
ChromeUtils.base64URLDecode(task.id, { padding: "ignore" })
);
let reportOut = {};
if (task.vdaf === "sum") {
Services.DAPTelemetry.GetReportPrioSum(
keys.leader_hpke,
keys.helper_hpke,
measurement,
task_id,
task.bits,
task.time_precision,
reportOut
);
} else if (task.vdaf === "sumvec") {
if (measurement.length != task.length) {
throw new Error(
"Measurement vector length doesn't match task configuration"
);
}
Services.DAPTelemetry.GetReportPrioSumVec(
keys.leader_hpke,
keys.helper_hpke,
measurement,
task_id,
task.bits,
task.time_precision,
reportOut
);
} else if (task.vdaf === "histogram") {
Services.DAPTelemetry.GetReportPrioHistogram(
keys.leader_hpke,
keys.helper_hpke,
measurement,
task_id,
task.length,
task.time_precision,
reportOut
);
} else {
throw new Error(
`Unknown measurement type for task ${task.id}: ${task.vdaf} ${task.bits}`
);
}
return new Uint8Array(reportOut.value).buffer;
}
/**
* Sends a report to the leader.
*
* @param {string} leader_endpoint
* The URL for the leader.
* @param {string} task_id
* Base64 encoded task_id as it appears in the upload path.
* @param {ArrayBuffer} report
* Raw bytes of the TLS encoded report.
* @param {AbortSignal} abortSignal
* Can be used to cancel network requests. Does not cancel computation.
* @param {object} options
* @param {string} options.ohttp_relay
* @param {Uint8Array} options.ohttp_hpke
* If an OHTTP relay is specified, the reports are uploaded over OHTTP. In
* this case, the OHTTP and DAP keys must be provided and this code will not
* attempt to fetch them.
*
* @returns Promise
* @resolves {undefined} Once the attempt to send the report completes, whether or not it was successful.
*/
async sendReport(leader_endpoint, task_id, report, abortSignal, options) {
// If telemetry disabled, don't upload DAP reports either.
if (!lazy.gTelemetryEnabled) {
return;
}
const upload_path = leader_endpoint + "/tasks/" + task_id + "/reports";
try {
let requestOptions = {
method: "PUT",
headers: { "Content-Type": "application/dap-report" },
body: report,
signal: abortSignal,
};
let response;
if (options.ohttp_relay) {
response = await lazy.ObliviousHTTP.ohttpRequest(
options.ohttp_relay,
options.ohttp_hpke,
upload_path,
requestOptions
);
} else {
response = await fetch(upload_path, requestOptions);
}
if (response.status != 200) {
const content_type = response.headers.get("content-type");
if (content_type && content_type === "application/json") {
// A JSON error from the DAP server.
let error = await response.json();
throw new Error(
`Sending failed. HTTP response: ${response.status} ${response.statusText}. Error: ${error.type} ${error.title}`
);
} else {
// A different error, e.g. from a load-balancer.
let error = await response.text();
throw new Error(
`Sending failed. HTTP response: ${response.status} ${response.statusText}. Error: ${error}`
);
}
} else {
lazy.logConsole.debug("DAP report sent");
}
} catch (err) {
if (err.name === "AbortError") {
lazy.logConsole.error("Aborted DAP report sending: ", err);
} else {
lazy.logConsole.error("Failed to send report: ", err);
}
throw err;
}
}
})();