951 lines
34 KiB
JavaScript
951 lines
34 KiB
JavaScript
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
import {
|
|
CLIENT_IS_THUNDERBIRD,
|
|
COMMAND_SENDTAB,
|
|
COMMAND_SENDTAB_TAIL,
|
|
COMMAND_CLOSETAB,
|
|
COMMAND_CLOSETAB_TAIL,
|
|
SCOPE_OLD_SYNC,
|
|
log,
|
|
} from "resource://gre/modules/FxAccountsCommon.sys.mjs";
|
|
|
|
import { clearTimeout, setTimeout } from "resource://gre/modules/Timer.sys.mjs";
|
|
|
|
import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
|
|
|
|
import { Observers } from "resource://services-common/observers.sys.mjs";
|
|
|
|
const lazy = {};
|
|
|
|
ChromeUtils.defineESModuleGetters(lazy, {
|
|
AsyncShutdown: "resource://gre/modules/AsyncShutdown.sys.mjs",
|
|
BulkKeyBundle: "resource://services-sync/keys.sys.mjs",
|
|
CryptoWrapper: "resource://services-sync/record.sys.mjs",
|
|
PushCrypto: "resource://gre/modules/PushCrypto.sys.mjs",
|
|
getRemoteCommandStore: "resource://services-sync/TabsStore.sys.mjs",
|
|
RemoteCommand: "resource://services-sync/TabsStore.sys.mjs",
|
|
Utils: "resource://services-sync/util.sys.mjs",
|
|
NimbusFeatures: "resource://nimbus/ExperimentAPI.sys.mjs",
|
|
});
|
|
|
|
XPCOMUtils.defineLazyPreferenceGetter(
|
|
lazy,
|
|
"INVALID_SHAREABLE_SCHEMES",
|
|
"services.sync.engine.tabs.filteredSchemes",
|
|
"",
|
|
null,
|
|
val => {
|
|
return new Set(val.split("|"));
|
|
}
|
|
);
|
|
|
|
XPCOMUtils.defineLazyServiceGetter(
|
|
lazy,
|
|
"idleService",
|
|
"@mozilla.org/widget/useridleservice;1",
|
|
"nsIUserIdleService"
|
|
);
|
|
|
|
const TOPIC_TABS_CHANGED = "services.sync.tabs.changed";
|
|
const COMMAND_MAX_PAYLOAD_SIZE = 16 * 1024;
|
|
|
|
export class FxAccountsCommands {
|
|
constructor(fxAccountsInternal) {
|
|
this._fxai = fxAccountsInternal;
|
|
this.sendTab = new SendTab(this, fxAccountsInternal);
|
|
this.closeTab = new CloseRemoteTab(this, fxAccountsInternal);
|
|
this.commandQueue = new CommandQueue(this, fxAccountsInternal);
|
|
this._invokeRateLimitExpiry = 0;
|
|
}
|
|
|
|
async availableCommands() {
|
|
let commands = {};
|
|
|
|
if (!CLIENT_IS_THUNDERBIRD) {
|
|
// Invalid keys usually means the account is not verified yet.
|
|
const encryptedSendTabKeys = await this.sendTab.getEncryptedCommandKeys();
|
|
|
|
if (encryptedSendTabKeys) {
|
|
commands[COMMAND_SENDTAB] = encryptedSendTabKeys;
|
|
}
|
|
|
|
const encryptedCloseTabKeys =
|
|
await this.closeTab.getEncryptedCommandKeys();
|
|
if (encryptedCloseTabKeys) {
|
|
commands[COMMAND_CLOSETAB] = encryptedCloseTabKeys;
|
|
}
|
|
}
|
|
|
|
return commands;
|
|
}
|
|
|
|
async invoke(command, device, payload) {
|
|
const { sessionToken } = await this._fxai.getUserAccountData([
|
|
"sessionToken",
|
|
]);
|
|
const client = this._fxai.fxAccountsClient;
|
|
const now = Date.now();
|
|
if (now < this._invokeRateLimitExpiry) {
|
|
const remaining = (this._invokeRateLimitExpiry - now) / 1000;
|
|
throw new Error(
|
|
`Invoke for ${command} is rate-limited for ${remaining} seconds.`
|
|
);
|
|
}
|
|
try {
|
|
let info = await client.invokeCommand(
|
|
sessionToken,
|
|
command,
|
|
device.id,
|
|
payload
|
|
);
|
|
if (!info.enqueued || !info.notified) {
|
|
// We want an error log here to help diagnose users who report failure.
|
|
log.error("Sending was only partially successful", info);
|
|
} else {
|
|
log.info("Successfully sent", info);
|
|
}
|
|
} catch (err) {
|
|
if (err.code && err.code === 429 && err.retryAfter) {
|
|
this._invokeRateLimitExpiry = Date.now() + err.retryAfter * 1000;
|
|
}
|
|
throw err;
|
|
}
|
|
log.info(`Payload sent to device ${device.id}.`);
|
|
}
|
|
|
|
/**
|
|
* Poll and handle device commands for the current device.
|
|
* This method can be called either in response to a Push message,
|
|
* or by itself as a "commands recovery" mechanism.
|
|
*
|
|
* @param {Number} notifiedIndex "Command received" push messages include
|
|
* the index of the command that triggered the message. We use it as a
|
|
* hint when we have no "last command index" stored.
|
|
*/
|
|
async pollDeviceCommands(notifiedIndex = 0) {
|
|
// Whether the call to `pollDeviceCommands` was initiated by a Push message from the FxA
|
|
// servers in response to a message being received or simply scheduled in order
|
|
// to fetch missed messages.
|
|
log.info(`Polling device commands.`);
|
|
await this._fxai.withCurrentAccountState(async state => {
|
|
const { device } = await state.getUserAccountData(["device"]);
|
|
if (!device) {
|
|
throw new Error("No device registration.");
|
|
}
|
|
// We increment lastCommandIndex by 1 because the server response includes the current index.
|
|
// If we don't have a `lastCommandIndex` stored, we fall back on the index from the push message we just got.
|
|
const lastCommandIndex = device.lastCommandIndex + 1 || notifiedIndex;
|
|
// We have already received this message before.
|
|
if (notifiedIndex > 0 && notifiedIndex < lastCommandIndex) {
|
|
return;
|
|
}
|
|
const { index, messages } =
|
|
await this._fetchDeviceCommands(lastCommandIndex);
|
|
if (messages.length) {
|
|
await state.updateUserAccountData({
|
|
device: { ...device, lastCommandIndex: index },
|
|
});
|
|
log.info(`Handling ${messages.length} messages`);
|
|
await this._handleCommands(messages, notifiedIndex);
|
|
}
|
|
});
|
|
return true;
|
|
}
|
|
|
|
async _fetchDeviceCommands(index, limit = null) {
|
|
const userData = await this._fxai.getUserAccountData();
|
|
if (!userData) {
|
|
throw new Error("No user.");
|
|
}
|
|
const { sessionToken } = userData;
|
|
if (!sessionToken) {
|
|
throw new Error("No session token.");
|
|
}
|
|
const client = this._fxai.fxAccountsClient;
|
|
const opts = { index };
|
|
if (limit != null) {
|
|
opts.limit = limit;
|
|
}
|
|
return client.getCommands(sessionToken, opts);
|
|
}
|
|
|
|
_getReason(notifiedIndex, messageIndex) {
|
|
// The returned reason value represents an explanation for why the command associated with the
|
|
// message of the given `messageIndex` is being handled. If `notifiedIndex` is zero the command
|
|
// is a part of a fallback polling process initiated by "Sync Now" ["poll"]. If `notifiedIndex` is
|
|
// greater than `messageIndex` this is a push command that was previously missed ["push-missed"],
|
|
// otherwise we assume this is a push command with no missed messages ["push"].
|
|
if (notifiedIndex == 0) {
|
|
return "poll";
|
|
} else if (notifiedIndex > messageIndex) {
|
|
return "push-missed";
|
|
}
|
|
// Note: The returned reason might be "push" in the case where a user sends multiple tabs
|
|
// in quick succession. We are not attempting to distinguish this from other push cases at
|
|
// present.
|
|
return "push";
|
|
}
|
|
|
|
async _handleCommands(messages, notifiedIndex) {
|
|
try {
|
|
await this._fxai.device.refreshDeviceList();
|
|
} catch (e) {
|
|
log.warn("Error refreshing device list", e);
|
|
}
|
|
// We debounce multiple incoming tabs so we show a single notification.
|
|
const tabsReceived = [];
|
|
const tabsToClose = [];
|
|
for (const { index, data } of messages) {
|
|
const { command, payload, sender: senderId } = data;
|
|
const reason = this._getReason(notifiedIndex, index);
|
|
const sender =
|
|
senderId && this._fxai.device.recentDeviceList
|
|
? this._fxai.device.recentDeviceList.find(d => d.id == senderId)
|
|
: null;
|
|
if (!sender) {
|
|
log.warn(
|
|
"Incoming command is from an unknown device (maybe disconnected?)"
|
|
);
|
|
}
|
|
switch (command) {
|
|
case COMMAND_CLOSETAB:
|
|
try {
|
|
const { urls } = await this.closeTab.handleTabClose(
|
|
senderId,
|
|
payload,
|
|
reason
|
|
);
|
|
log.info(
|
|
`Close Tab received with FxA commands: "${urls.length} tabs"
|
|
from ${sender ? sender.name : "Unknown device"}.`
|
|
);
|
|
// URLs are PII, so only logged at trace.
|
|
log.trace(`Close Remote Tabs received URLs: ${urls}`);
|
|
tabsToClose.push({ urls, sender });
|
|
} catch (e) {
|
|
log.error(`Error while handling incoming Close Tab payload.`, e);
|
|
}
|
|
break;
|
|
case COMMAND_SENDTAB:
|
|
try {
|
|
const { title, uri } = await this.sendTab.handle(
|
|
senderId,
|
|
payload,
|
|
reason
|
|
);
|
|
log.info(
|
|
`Tab received with FxA commands: "${
|
|
title || "<no title>"
|
|
}" from ${sender ? sender.name : "Unknown device"}.`
|
|
);
|
|
// URLs are PII, so only logged at trace.
|
|
log.trace(`Tab received URL: ${uri}`);
|
|
// This should eventually be rare to hit as all platforms will be using the same
|
|
// scheme filter list, but we have this here in the case other platforms
|
|
// haven't caught up and/or trying to send invalid uris using older versions
|
|
const scheme = Services.io.newURI(uri).scheme;
|
|
if (lazy.INVALID_SHAREABLE_SCHEMES.has(scheme)) {
|
|
throw new Error("Invalid scheme found for received URI.");
|
|
}
|
|
tabsReceived.push({ title, uri, sender });
|
|
} catch (e) {
|
|
log.error(`Error while handling incoming Send Tab payload.`, e);
|
|
}
|
|
break;
|
|
default:
|
|
log.info(`Unknown command: ${command}.`);
|
|
}
|
|
}
|
|
if (tabsReceived.length) {
|
|
this._notifyFxATabsReceived(tabsReceived);
|
|
}
|
|
if (tabsToClose.length) {
|
|
this._notifyFxATabsClosed(tabsToClose);
|
|
}
|
|
}
|
|
|
|
_notifyFxATabsReceived(tabsReceived) {
|
|
Observers.notify("fxaccounts:commands:open-uri", tabsReceived);
|
|
}
|
|
|
|
_notifyFxATabsClosed(tabsToClose) {
|
|
Observers.notify("fxaccounts:commands:close-uri", tabsToClose);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This is built on top of FxA commands.
|
|
*
|
|
* Devices exchange keys wrapped in the oldsync key between themselves (getEncryptedCommandKeys)
|
|
* during the device registration flow. The FxA server can theoretically never
|
|
* retrieve the send tab keys since it doesn't know the oldsync key.
|
|
*
|
|
* Note about the keys:
|
|
* The server has the `pushPublicKey`. The FxA server encrypt the send-tab payload again using the
|
|
* push keys - after the client has encrypted the payload using the send-tab keys.
|
|
* The push keys are different from the send-tab keys. The FxA server uses
|
|
* the push keys to deliver the tabs using same mechanism we use for web-push.
|
|
* However, clients use the send-tab keys for end-to-end encryption.
|
|
*
|
|
* Every command uses the same key management code, although each has its own key.
|
|
*/
|
|
|
|
export class Command {
|
|
constructor(commands, fxAccountsInternal) {
|
|
this._commands = commands;
|
|
this._fxai = fxAccountsInternal;
|
|
}
|
|
|
|
// Must be set by the command.
|
|
deviceCapability; // eg, COMMAND_SENDTAB;
|
|
keyFieldName; // eg, "sendTabKeys";
|
|
encryptedKeyFieldName; // eg, "encryptedSendTabKeys"
|
|
|
|
// Returns true if the target device is compatible with FxA Commands Send tab.
|
|
isDeviceCompatible(device) {
|
|
return (
|
|
device.availableCommands &&
|
|
device.availableCommands[this.deviceCapability]
|
|
);
|
|
}
|
|
|
|
async _encrypt(bytes, device) {
|
|
let bundle = device.availableCommands[this.deviceCapability];
|
|
if (!bundle) {
|
|
throw new Error(`Device ${device.id} does not have send tab keys.`);
|
|
}
|
|
const oldsyncKey = await this._fxai.keys.getKeyForScope(SCOPE_OLD_SYNC);
|
|
// Older clients expect this to be hex, due to pre-JWK sync key ids :-(
|
|
const ourKid = this._fxai.keys.kidAsHex(oldsyncKey);
|
|
const { kid: theirKid } = JSON.parse(
|
|
device.availableCommands[this.deviceCapability]
|
|
);
|
|
if (theirKid != ourKid) {
|
|
throw new Error("Target Send Tab key ID is different from ours");
|
|
}
|
|
const json = JSON.parse(bundle);
|
|
const wrapper = new lazy.CryptoWrapper();
|
|
wrapper.deserialize({ payload: json });
|
|
const syncKeyBundle = lazy.BulkKeyBundle.fromJWK(oldsyncKey);
|
|
let { publicKey, authSecret } = await wrapper.decrypt(syncKeyBundle);
|
|
authSecret = urlsafeBase64Decode(authSecret);
|
|
publicKey = urlsafeBase64Decode(publicKey);
|
|
|
|
const { ciphertext: encrypted } = await lazy.PushCrypto.encrypt(
|
|
bytes,
|
|
publicKey,
|
|
authSecret
|
|
);
|
|
return urlsafeBase64Encode(encrypted);
|
|
}
|
|
|
|
async _decrypt(ciphertext) {
|
|
let { privateKey, publicKey, authSecret } =
|
|
await this._getPersistedCommandKeys();
|
|
publicKey = urlsafeBase64Decode(publicKey);
|
|
authSecret = urlsafeBase64Decode(authSecret);
|
|
ciphertext = new Uint8Array(urlsafeBase64Decode(ciphertext));
|
|
return lazy.PushCrypto.decrypt(
|
|
privateKey,
|
|
publicKey,
|
|
authSecret,
|
|
// The only Push encoding we support.
|
|
{ encoding: "aes128gcm" },
|
|
ciphertext
|
|
);
|
|
}
|
|
|
|
async _getPersistedCommandKeys() {
|
|
const { device } = await this._fxai.getUserAccountData(["device"]);
|
|
return device && device[this.keyFieldName];
|
|
}
|
|
|
|
async _generateAndPersistCommandKeys() {
|
|
let [publicKey, privateKey] = await lazy.PushCrypto.generateKeys();
|
|
publicKey = urlsafeBase64Encode(publicKey);
|
|
let authSecret = lazy.PushCrypto.generateAuthenticationSecret();
|
|
authSecret = urlsafeBase64Encode(authSecret);
|
|
const sendTabKeys = {
|
|
publicKey,
|
|
privateKey,
|
|
authSecret,
|
|
};
|
|
await this._fxai.withCurrentAccountState(async state => {
|
|
const { device = {} } = await state.getUserAccountData(["device"]);
|
|
device[this.keyFieldName] = sendTabKeys;
|
|
log.trace(
|
|
`writing to ${this.keyFieldName} for command ${this.deviceCapability}`
|
|
);
|
|
await state.updateUserAccountData({
|
|
device,
|
|
});
|
|
});
|
|
return sendTabKeys;
|
|
}
|
|
|
|
async _getPersistedEncryptedCommandKey() {
|
|
const data = await this._fxai.getUserAccountData([
|
|
this.encryptedKeyFieldName,
|
|
]);
|
|
return data[this.encryptedKeyFieldName];
|
|
}
|
|
|
|
async _generateAndPersistEncryptedCommandKey() {
|
|
if (!(await this._fxai.keys.canGetKeyForScope(SCOPE_OLD_SYNC))) {
|
|
log.info("Can't fetch keys, so unable to determine command keys");
|
|
return null;
|
|
}
|
|
let sendTabKeys = await this._getPersistedCommandKeys();
|
|
if (!sendTabKeys) {
|
|
log.info("Could not find command keys, generating them");
|
|
sendTabKeys = await this._generateAndPersistCommandKeys();
|
|
}
|
|
// Strip the private key from the bundle to encrypt.
|
|
const keyToEncrypt = {
|
|
publicKey: sendTabKeys.publicKey,
|
|
authSecret: sendTabKeys.authSecret,
|
|
};
|
|
let oldsyncKey;
|
|
try {
|
|
oldsyncKey = await this._fxai.keys.getKeyForScope(SCOPE_OLD_SYNC);
|
|
} catch (ex) {
|
|
log.warn("Failed to fetch keys, so unable to determine command keys", ex);
|
|
return null;
|
|
}
|
|
const wrapper = new lazy.CryptoWrapper();
|
|
wrapper.cleartext = keyToEncrypt;
|
|
const keyBundle = lazy.BulkKeyBundle.fromJWK(oldsyncKey);
|
|
await wrapper.encrypt(keyBundle);
|
|
const encryptedSendTabKeys = JSON.stringify({
|
|
// This is expected in hex, due to pre-JWK sync key ids :-(
|
|
kid: this._fxai.keys.kidAsHex(oldsyncKey),
|
|
IV: wrapper.IV,
|
|
hmac: wrapper.hmac,
|
|
ciphertext: wrapper.ciphertext,
|
|
});
|
|
await this._fxai.withCurrentAccountState(async state => {
|
|
let data = {};
|
|
data[this.encryptedKeyFieldName] = encryptedSendTabKeys;
|
|
await state.updateUserAccountData(data);
|
|
});
|
|
return encryptedSendTabKeys;
|
|
}
|
|
|
|
async getEncryptedCommandKeys() {
|
|
log.trace("Getting command keys", this.deviceCapability);
|
|
let encryptedSendTabKeys = await this._getPersistedEncryptedCommandKey();
|
|
const sendTabKeys = await this._getPersistedCommandKeys();
|
|
if (!encryptedSendTabKeys || !sendTabKeys) {
|
|
log.info(
|
|
`Generating and persisting encrypted key (${!!encryptedSendTabKeys}, ${!!sendTabKeys})`
|
|
);
|
|
// Generating the encrypted key requires the sync key so we expect to fail
|
|
// in some cases (primary password is locked, account not verified, etc)
|
|
// However, we will eventually end up generating it when we can, and device registration
|
|
// will handle this late update and update the remote record as necessary, so it gets there in the end.
|
|
// It's okay to persist these keys in plain text; they're encrypted.
|
|
encryptedSendTabKeys =
|
|
await this._generateAndPersistEncryptedCommandKey();
|
|
}
|
|
return encryptedSendTabKeys;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send Tab
|
|
*/
|
|
export class SendTab extends Command {
|
|
deviceCapability = COMMAND_SENDTAB;
|
|
keyFieldName = "sendTabKeys";
|
|
encryptedKeyFieldName = "encryptedSendTabKeys";
|
|
|
|
/**
|
|
* @param {Device[]} to - Device objects (typically returned by fxAccounts.getDevicesList()).
|
|
* @param {Object} tab
|
|
* @param {string} tab.url
|
|
* @param {string} tab.title
|
|
* @returns A report object, in the shape of
|
|
* {succeded: [Device], error: [{device: Device, error: Exception}]}
|
|
*/
|
|
async send(to, tab) {
|
|
log.info(`Sending a tab to ${to.length} devices.`);
|
|
const flowID = this._fxai.telemetry.generateFlowID();
|
|
const encoder = new TextEncoder();
|
|
const data = { entries: [{ title: tab.title, url: tab.url }] };
|
|
const report = {
|
|
succeeded: [],
|
|
failed: [],
|
|
};
|
|
for (let device of to) {
|
|
try {
|
|
const streamID = this._fxai.telemetry.generateFlowID();
|
|
const targetData = Object.assign({ flowID, streamID }, data);
|
|
const bytes = encoder.encode(JSON.stringify(targetData));
|
|
const encrypted = await this._encrypt(bytes, device);
|
|
// FxA expects an object as the payload, but we only have a single encrypted string; wrap it.
|
|
// If you add any plaintext items to this payload, please carefully consider the privacy implications
|
|
// of revealing that data to the FxA server.
|
|
const payload = { encrypted };
|
|
await this._commands.invoke(COMMAND_SENDTAB, device, payload);
|
|
this._fxai.telemetry.recordEvent(
|
|
"command-sent",
|
|
COMMAND_SENDTAB_TAIL,
|
|
this._fxai.telemetry.sanitizeDeviceId(device.id),
|
|
{ flowID, streamID }
|
|
);
|
|
report.succeeded.push(device);
|
|
} catch (error) {
|
|
log.error("Error while invoking a send tab command.", error);
|
|
report.failed.push({ device, error });
|
|
}
|
|
}
|
|
return report;
|
|
}
|
|
|
|
// Handle incoming send tab payload, called by FxAccountsCommands.
|
|
async handle(senderID, { encrypted }, reason) {
|
|
const bytes = await this._decrypt(encrypted);
|
|
const decoder = new TextDecoder("utf8");
|
|
const data = JSON.parse(decoder.decode(bytes));
|
|
const { flowID, streamID, entries } = data;
|
|
const current = data.hasOwnProperty("current")
|
|
? data.current
|
|
: entries.length - 1;
|
|
const { title, url: uri } = entries[current];
|
|
// `flowID` and `streamID` are in the top-level of the JSON, `entries` is
|
|
// an array of "tabs" with `current` being what index is the one we care
|
|
// about, or the last one if not specified.
|
|
this._fxai.telemetry.recordEvent(
|
|
"command-received",
|
|
COMMAND_SENDTAB_TAIL,
|
|
this._fxai.telemetry.sanitizeDeviceId(senderID),
|
|
{ flowID, streamID, reason }
|
|
);
|
|
|
|
return {
|
|
title,
|
|
uri,
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close Tabs
|
|
*/
|
|
export class CloseRemoteTab extends Command {
|
|
deviceCapability = COMMAND_CLOSETAB;
|
|
keyFieldName = "closeTabKeys";
|
|
encryptedKeyFieldName = "encryptedCloseTabKeys";
|
|
|
|
/**
|
|
* @param {Device} target - Device object (typically returned by fxAccounts.getDevicesList()).
|
|
* @param {String[]} urls - array of urls that should be closed on the remote device
|
|
*/
|
|
async sendCloseTabsCommand(target, urls, flowID) {
|
|
log.info(`Sending tab closures to ${target.id} device.`);
|
|
const encoder = new TextEncoder();
|
|
try {
|
|
const streamID = this._fxai.telemetry.generateFlowID();
|
|
const targetData = { flowID, streamID, urls };
|
|
const bytes = encoder.encode(JSON.stringify(targetData));
|
|
const encrypted = await this._encrypt(bytes, target);
|
|
// FxA expects an object as the payload, but we only have a single encrypted string; wrap it.
|
|
// If you add any plaintext items to this payload, please carefully consider the privacy implications
|
|
// of revealing that data to the FxA server.
|
|
const payload = { encrypted };
|
|
await this._commands.invoke(COMMAND_CLOSETAB, target, payload);
|
|
this._fxai.telemetry.recordEvent(
|
|
"command-sent",
|
|
COMMAND_CLOSETAB_TAIL,
|
|
this._fxai.telemetry.sanitizeDeviceId(target.id),
|
|
{ flowID, streamID }
|
|
);
|
|
return true;
|
|
} catch (error) {
|
|
// We should also show the user there was some kind've error
|
|
log.error("Error while invoking a send tab command.", error);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Returns true if:
|
|
// - The target device is compatible with closing a tab (device capability) and
|
|
// - The local device has the feature enabled locally
|
|
isDeviceCompatible(device) {
|
|
return (
|
|
lazy.NimbusFeatures.remoteTabManagement.getVariable("closeTabsEnabled") &&
|
|
super.isDeviceCompatible(device)
|
|
);
|
|
}
|
|
|
|
// Handle incoming remote tab payload, called by FxAccountsCommands.
|
|
async handleTabClose(senderID, { encrypted }, reason) {
|
|
const bytes = await this._decrypt(encrypted);
|
|
const decoder = new TextDecoder("utf8");
|
|
const data = JSON.parse(decoder.decode(bytes));
|
|
// urls is an array of strings
|
|
const { flowID, streamID, urls } = data;
|
|
this._fxai.telemetry.recordEvent(
|
|
"command-received",
|
|
COMMAND_CLOSETAB_TAIL,
|
|
this._fxai.telemetry.sanitizeDeviceId(senderID),
|
|
{ flowID, streamID, reason }
|
|
);
|
|
|
|
return {
|
|
urls,
|
|
};
|
|
}
|
|
}
|
|
|
|
export class CommandQueue {
|
|
// The delay between a command being queued and it being actioned. This delay
|
|
// is primarily to support "undo" functionality in the UI.
|
|
// It's likely we will end up needing a different delay per command (including no delay), but this
|
|
// seems fine while we work that out.
|
|
DELAY = 5000;
|
|
|
|
// The timer ID if we have one scheduled, otherwise null
|
|
#timer = null;
|
|
|
|
// `this.#onShutdown` bound to `this`.
|
|
#onShutdownBound = null;
|
|
|
|
// Since we only ever show one notification to the user
|
|
// we keep track of how many tabs have actually been closed
|
|
// and update the count, user dismissing the notification will
|
|
// reset the count
|
|
closeTabNotificationCount = 0;
|
|
hasPendingCloseTabNotification = false;
|
|
|
|
// We ensure the queue is flushed soon after startup. After the first tab sync we see, we
|
|
// wait for this many seconds of being idle before checking.
|
|
// Note that this delay has nothing to do with DELAY - that is for "undo" capability, this
|
|
// delay is to ensure we don't put unnecessary load on the browser during startup.
|
|
#idleThresholdSeconds = 3;
|
|
#isObservingTabSyncs = false;
|
|
// This helps ensure we aren't flushing the queue multiple times concurrently.
|
|
#flushQueuePromise = null;
|
|
|
|
constructor(commands, fxAccountsInternal) {
|
|
this._commands = commands;
|
|
this._fxai = fxAccountsInternal;
|
|
Services.obs.addObserver(this, "services.sync.tabs.command-queued");
|
|
Services.obs.addObserver(this, "weave:engine:sync:finish");
|
|
this.#isObservingTabSyncs = true;
|
|
log.trace("Command queue observer created");
|
|
this.#onShutdownBound = this.#onShutdown.bind(this);
|
|
lazy.AsyncShutdown.appShutdownConfirmed.addBlocker(
|
|
"FxAccountsCommands: flush command queue",
|
|
this.#onShutdownBound
|
|
);
|
|
}
|
|
|
|
// Used for tests - when in the browser this object lives forever.
|
|
shutdown() {
|
|
if (this.#timer) {
|
|
clearTimeout(this.#timer);
|
|
}
|
|
Services.obs.removeObserver(this, "services.sync.tabs.command-queued");
|
|
if (this.#isObservingTabSyncs) {
|
|
Services.obs.removeObserver(this, "weave:engine:sync:finish");
|
|
this.#isObservingTabSyncs = false;
|
|
}
|
|
lazy.AsyncShutdown.appShutdownConfirmed.removeBlocker(
|
|
this.#onShutdownBound
|
|
);
|
|
this.#onShutdownBound = null;
|
|
}
|
|
|
|
observe(subject, topic, data) {
|
|
log.trace(
|
|
`CommandQueue observed topic=${topic}, data=${data}, subject=${subject}`
|
|
);
|
|
switch (topic) {
|
|
case "services.sync.tabs.command-queued":
|
|
this.flushQueue().catch(e => {
|
|
log.error("Failed to flush the outgoing queue", e);
|
|
});
|
|
break;
|
|
|
|
case "weave:engine:sync:finish":
|
|
// This is to pick up pending commands we failed to send in the last session.
|
|
if (data != "tabs") {
|
|
return;
|
|
}
|
|
Services.obs.removeObserver(this, "weave:engine:sync:finish");
|
|
this.#isObservingTabSyncs = false;
|
|
this.#checkQueueAfterStartup();
|
|
break;
|
|
|
|
default:
|
|
log.error(`unexpected observer topic: ${topic}`);
|
|
}
|
|
}
|
|
|
|
// for test mocking.
|
|
_getIdleService() {
|
|
return lazy.idleService;
|
|
}
|
|
|
|
async #checkQueueAfterStartup() {
|
|
// do this on idle because we are probably syncing quite close to startup.
|
|
const idleService = this._getIdleService();
|
|
const idleObserver = (/* subject, topic, data */) => {
|
|
idleService.removeIdleObserver(idleObserver, this.#idleThresholdSeconds);
|
|
log.info("checking if the command queue is empty now we are idle");
|
|
this.flushQueue()
|
|
.then(didSend => {
|
|
// TODO: it would be good to get telemetry here, because we expect this to be true rarely.
|
|
log.info(
|
|
`pending command check had ${didSend ? "some" : "no"} commands`
|
|
);
|
|
})
|
|
.catch(err => {
|
|
log.error(
|
|
"Checking for pending tab commands after first tab sync failed",
|
|
err
|
|
);
|
|
});
|
|
};
|
|
idleService.addIdleObserver(idleObserver, this.#idleThresholdSeconds);
|
|
}
|
|
|
|
async flushQueue(isForShutdown = false) {
|
|
// We really don't want multiple queue flushes concurrently, which is a real possibility.
|
|
// If we are shutting down and there's already a `flushQueue()` running, it's almost certainly
|
|
// not going to be `isForShutdown()`. We don't really want to wait for that to complete just
|
|
// to start another, so there's a risk we will fail to send commands in that scenario - but
|
|
// we will send them at startup time.
|
|
if (this.#flushQueuePromise == null) {
|
|
this.#flushQueuePromise = this.#flushQueue(isForShutdown);
|
|
}
|
|
try {
|
|
return await this.#flushQueuePromise;
|
|
} finally {
|
|
this.#flushQueuePromise = null;
|
|
}
|
|
}
|
|
|
|
async #flushQueue(isForShutdown) {
|
|
// get all the queued items to work out what's ready to send. If a device has queued item less than
|
|
// our pushDelay, then we don't send *any* command for that device yet, but ensure a timer is set
|
|
// for the delay.
|
|
let store = await lazy.getRemoteCommandStore();
|
|
let pending = await store.getUnsentCommands();
|
|
log.trace("flushQueue total queued items", pending.length);
|
|
// any timeRequested less than `sendThreshold` should be sent now (unless we are shutting down,
|
|
// in which case we send everything now)
|
|
let now = this.now();
|
|
// We want to be efficient with batching commands to send to the user
|
|
// so we categorize things into 3 buckets:
|
|
// mustSend - overdue and should be sent as early as we can
|
|
// canSend - is due but not yet "overdue", should be sent if possible
|
|
// early - can still be undone and should not be sent yet
|
|
const mustSendThreshold = isForShutdown ? Infinity : now - this.DELAY;
|
|
const canSendThreshold = isForShutdown ? Infinity : now - this.DELAY * 2;
|
|
// make a map of deviceId -> device
|
|
let recentDevices = this._fxai.device.recentDeviceList;
|
|
if (!recentDevices.length) {
|
|
// If we can't map a device ID to the device with the keys etc, we are screwed!
|
|
log.error("No devices available for queued tab commands");
|
|
return false;
|
|
}
|
|
let deviceMap = new Map(recentDevices.map(d => [d.id, d]));
|
|
// make a map of commands keyed by device ID.
|
|
let byDevice = Map.groupBy(pending, c => c.deviceId);
|
|
let nextTime = Infinity;
|
|
let didSend = false;
|
|
for (let [deviceId, commands] of byDevice) {
|
|
let device = deviceMap.get(deviceId);
|
|
if (!device) {
|
|
// If we can't map *this* device ID to a device with the keys etc, we are screwed!
|
|
// This however *is* possible if the target device was disconnected before we had a chance to send it,
|
|
// so remove this item.
|
|
log.warn("Unknown device for queued tab commands", deviceId);
|
|
await Promise.all(
|
|
commands.map(command => store.removeRemoteCommand(deviceId, command))
|
|
);
|
|
continue;
|
|
}
|
|
let toSend = [];
|
|
// We process in reverse order so it's newest-to-oldest
|
|
// which means if the newest is already a "must send"
|
|
// we can simply send all of the "can sends"
|
|
for (const command of commands.reverse()) {
|
|
if (!(command.command instanceof lazy.RemoteCommand.CloseTab)) {
|
|
log.error(`Ignoring unknown pending command ${command}`);
|
|
continue;
|
|
}
|
|
if (command.timeRequested <= mustSendThreshold) {
|
|
log.trace(
|
|
`command for url ${command.command.url} is overdue, adding to send`
|
|
);
|
|
toSend.push(command);
|
|
} else if (command.timeRequested <= canSendThreshold) {
|
|
if (toSend.length) {
|
|
log.trace(`command for url ${command.command.url} is due,
|
|
since there others to be sent, also adding to send`);
|
|
toSend.push(command);
|
|
} else {
|
|
// Though it's due, since there are no others we can check again
|
|
// and see if we can batch
|
|
nextTime = Math.min(nextTime, command.timeRequested + this.DELAY);
|
|
}
|
|
} else {
|
|
// We set the next timer just a little later to ensure we'll have an overdue
|
|
nextTime = Math.min(
|
|
nextTime,
|
|
command.timeRequested + this.DELAY * 1.1
|
|
);
|
|
// Since the list is sorted newest to oldest,
|
|
// we can assume the rest are not ready
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (toSend.length) {
|
|
let urlsToClose = toSend.map(c => c.command.url);
|
|
// Generate a flowID to use for all chunked commands
|
|
const flowID = this._fxai.telemetry.generateFlowID();
|
|
// If we're dealing with large sets of urls, we should split them across
|
|
// multiple payloads to prevent breaking the issues for the user
|
|
let chunks = this.chunkUrls(urlsToClose, COMMAND_MAX_PAYLOAD_SIZE);
|
|
for (let chunk of chunks) {
|
|
if (
|
|
await this._commands.closeTab.sendCloseTabsCommand(
|
|
device,
|
|
chunk,
|
|
flowID
|
|
)
|
|
) {
|
|
// We build a set from the sent urls for faster comparing
|
|
const urlChunkSet = new Set(chunk);
|
|
// success! Mark them as sent.
|
|
for (let cmd of toSend.filter(c =>
|
|
urlChunkSet.has(c.command.url)
|
|
)) {
|
|
log.trace(
|
|
`Setting pending command for device ${deviceId} as sent`,
|
|
cmd
|
|
);
|
|
await store.setPendingCommandSent(cmd);
|
|
didSend = true;
|
|
}
|
|
} else {
|
|
// We should investigate a better backoff strategy
|
|
// https://bugzilla.mozilla.org/show_bug.cgi?id=1899433
|
|
// For now just say 60s.
|
|
log.warn(
|
|
`Failed to send close tab commands for device ${deviceId}`
|
|
);
|
|
nextTime = Math.min(nextTime, now + 60000);
|
|
}
|
|
}
|
|
} else {
|
|
log.trace(`Skipping send for device ${deviceId}`);
|
|
}
|
|
}
|
|
|
|
if (didSend) {
|
|
Services.obs.notifyObservers(null, TOPIC_TABS_CHANGED);
|
|
}
|
|
|
|
if (nextTime == Infinity) {
|
|
log.info("No new close-tab timer needed");
|
|
} else if (isForShutdown) {
|
|
// because we never delay sending in this case the logic above should never set `nextTime`
|
|
log.error(
|
|
"logic error in command queue manager: flush for shutdown should never set a timer"
|
|
);
|
|
} else {
|
|
let delay = nextTime - now + 10;
|
|
log.trace(`Setting new close-tab timer for ${delay}ms`);
|
|
this._ensureTimer(delay);
|
|
}
|
|
return didSend;
|
|
}
|
|
|
|
// Take a an array of urls and a max size and split them into chunks
|
|
// that are smaller than the passed in max size
|
|
// Note: This method modifies the passed in array
|
|
chunkUrls(urls, maxSize) {
|
|
let chunks = [];
|
|
|
|
// For optimal packing, we sort the array of urls from shortest-to-longest
|
|
urls.sort((a, b) => a.length - b.length);
|
|
|
|
while (urls.length) {
|
|
let chunk = lazy.Utils.tryFitItems(urls, maxSize);
|
|
if (!chunk.length) {
|
|
// None of the remaining URLs can fit into a single command
|
|
urls.forEach(url => {
|
|
log.warn(`Skipping oversized URL: ${url}`);
|
|
});
|
|
break;
|
|
}
|
|
chunks.push(chunk);
|
|
// Remove the processed URLs from the list
|
|
urls.splice(0, chunk.length);
|
|
}
|
|
return chunks;
|
|
}
|
|
|
|
async _ensureTimer(timeout) {
|
|
log.info(
|
|
`Setting a new close-tab timer with delay=${timeout} with existing timer=${!!this
|
|
.#timer}`
|
|
);
|
|
|
|
if (this.#timer) {
|
|
clearTimeout(this.#timer);
|
|
}
|
|
|
|
// If the browser shuts down while a timer exists we should force the send
|
|
// While we should pick up the command after a restart, we don't know
|
|
// how long that will be.
|
|
// See https://bugzilla.mozilla.org/show_bug.cgi?id=1888299
|
|
this.#timer = setTimeout(async () => {
|
|
// XXX - this might be racey - if a new timer fires before this promise resolves - it
|
|
// might seem unlikely, but network is involved!
|
|
// flushQueue might create another timer, so we must clear our current timer first.
|
|
this.#timer = null;
|
|
await this.flushQueue();
|
|
}, timeout);
|
|
}
|
|
|
|
// On shutdown we want to send any pending items - ie, pretend the timer fired *now*.
|
|
// Sadly it's not easy for us to abort any in-flight requests, nor to limit the amount of
|
|
// time any new requests we create take, so we don't do this for now. This means that in
|
|
// the case of a super slow network or super slow FxA, we might crash at shutdown, but we
|
|
// can think of doing this in a followup.
|
|
async #onShutdown() {
|
|
// If there is no timer set, then there's nothing pending to do.
|
|
log.debug(
|
|
`CommandQueue shutdown is flushing the queue with a timer=${!!this
|
|
.#timer}`
|
|
);
|
|
if (this.#timer) {
|
|
// We don't want the current one to fire at the same time!
|
|
clearTimeout(this.#timer);
|
|
this.#timer = null;
|
|
await this.flushQueue(true);
|
|
}
|
|
}
|
|
|
|
// hook points for tests.
|
|
now() {
|
|
return Date.now();
|
|
}
|
|
}
|
|
|
|
function urlsafeBase64Encode(buffer) {
|
|
return ChromeUtils.base64URLEncode(new Uint8Array(buffer), { pad: false });
|
|
}
|
|
|
|
function urlsafeBase64Decode(str) {
|
|
return ChromeUtils.base64URLDecode(str, { padding: "reject" });
|
|
}
|