summaryrefslogtreecommitdiffstats
path: root/dom/push/PushServiceWebSocket.jsm
diff options
context:
space:
mode:
Diffstat (limited to 'dom/push/PushServiceWebSocket.jsm')
-rw-r--r--dom/push/PushServiceWebSocket.jsm1311
1 files changed, 1311 insertions, 0 deletions
diff --git a/dom/push/PushServiceWebSocket.jsm b/dom/push/PushServiceWebSocket.jsm
new file mode 100644
index 0000000000..7dd8b347a2
--- /dev/null
+++ b/dom/push/PushServiceWebSocket.jsm
@@ -0,0 +1,1311 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
+const { XPCOMUtils } = ChromeUtils.import(
+ "resource://gre/modules/XPCOMUtils.jsm"
+);
+
+const { PushDB } = ChromeUtils.import("resource://gre/modules/PushDB.jsm");
+const { PushRecord } = ChromeUtils.import(
+ "resource://gre/modules/PushRecord.jsm"
+);
+const { PushCrypto } = ChromeUtils.import(
+ "resource://gre/modules/PushCrypto.jsm"
+);
+
+ChromeUtils.defineModuleGetter(
+ this,
+ "pushBroadcastService",
+ "resource://gre/modules/PushBroadcastService.jsm"
+);
+ChromeUtils.defineModuleGetter(
+ this,
+ "ObjectUtils",
+ "resource://gre/modules/ObjectUtils.jsm"
+);
+
+const kPUSHWSDB_DB_NAME = "pushapi";
+const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
+const kPUSHWSDB_STORE_NAME = "pushapi";
+
+// WebSocket close code sent by the server to indicate that the client should
+// not automatically reconnect.
+const kBACKOFF_WS_STATUS_CODE = 4774;
+
+// Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
+// included in request payloads.
+const kACK_STATUS_TO_CODE = {
+ [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
+ [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
+ [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
+};
+
+const kUNREGISTER_REASON_TO_CODE = {
+ [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
+ [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
+ [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
+};
+
+const kDELIVERY_REASON_TO_CODE = {
+ [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
+ [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
+ [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
+};
+
+const prefs = Services.prefs.getBranch("dom.push.");
+
+const EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
+
+XPCOMUtils.defineLazyGetter(this, "console", () => {
+ let { ConsoleAPI } = ChromeUtils.import("resource://gre/modules/Console.jsm");
+ return new ConsoleAPI({
+ maxLogLevelPref: "dom.push.loglevel",
+ prefix: "PushServiceWebSocket",
+ });
+});
+
+/**
+ * A proxy between the PushService and the WebSocket. The listener is used so
+ * that the PushService can silence messages from the WebSocket by setting
+ * PushWebSocketListener._pushService to null. This is required because
+ * a WebSocket can continue to send messages or errors after it has been
+ * closed but the PushService may not be interested in these. It's easier to
+ * stop listening than to have checks at specific points.
+ */
+var PushWebSocketListener = function(pushService) {
+ this._pushService = pushService;
+};
+
+PushWebSocketListener.prototype = {
+ onStart(context) {
+ if (!this._pushService) {
+ return;
+ }
+ this._pushService._wsOnStart(context);
+ },
+
+ onStop(context, statusCode) {
+ if (!this._pushService) {
+ return;
+ }
+ this._pushService._wsOnStop(context, statusCode);
+ },
+
+ onAcknowledge(context, size) {
+ // EMPTY
+ },
+
+ onBinaryMessageAvailable(context, message) {
+ // EMPTY
+ },
+
+ onMessageAvailable(context, message) {
+ if (!this._pushService) {
+ return;
+ }
+ this._pushService._wsOnMessageAvailable(context, message);
+ },
+
+ onServerClose(context, aStatusCode, aReason) {
+ if (!this._pushService) {
+ return;
+ }
+ this._pushService._wsOnServerClose(context, aStatusCode, aReason);
+ },
+};
+
+// websocket states
+// websocket is off
+const STATE_SHUT_DOWN = 0;
+// Websocket has been opened on client side, waiting for successful open.
+// (_wsOnStart)
+const STATE_WAITING_FOR_WS_START = 1;
+// Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
+const STATE_WAITING_FOR_HELLO = 2;
+// Websocket operational, handshake completed, begin protocol messaging.
+const STATE_READY = 3;
+
+var PushServiceWebSocket = {
+ _mainPushService: null,
+ _serverURI: null,
+ _currentlyRegistering: new Set(),
+
+ newPushDB() {
+ return new PushDB(
+ kPUSHWSDB_DB_NAME,
+ kPUSHWSDB_DB_VERSION,
+ kPUSHWSDB_STORE_NAME,
+ "channelID",
+ PushRecordWebSocket
+ );
+ },
+
+ disconnect() {
+ this._shutdownWS();
+ },
+
+ observe(aSubject, aTopic, aData) {
+ if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
+ this._onUAIDChanged();
+ } else if (aTopic == "timer-callback") {
+ this._onTimerFired(aSubject);
+ }
+ },
+
+ /**
+ * Handles a UAID change. Unlike reconnects, we cancel all pending requests
+ * after disconnecting. Existing subscriptions stored in IndexedDB will be
+ * dropped on reconnect.
+ */
+ _onUAIDChanged() {
+ console.debug("onUAIDChanged()");
+
+ this._shutdownWS();
+ this._startBackoffTimer();
+ },
+
+ /** Handles a ping, backoff, or request timeout timer event. */
+ _onTimerFired(timer) {
+ console.debug("onTimerFired()");
+
+ if (timer == this._pingTimer) {
+ this._sendPing();
+ return;
+ }
+
+ if (timer == this._backoffTimer) {
+ console.debug("onTimerFired: Reconnecting after backoff");
+ this._beginWSSetup();
+ return;
+ }
+
+ if (timer == this._requestTimeoutTimer) {
+ this._timeOutRequests();
+ }
+ },
+
+ /**
+ * Sends a ping to the server. Bypasses the request queue, but starts the
+ * request timeout timer. If the socket is already closed, or the server
+ * does not respond within the timeout, the client will reconnect.
+ */
+ _sendPing() {
+ console.debug("sendPing()");
+
+ this._startRequestTimeoutTimer();
+ try {
+ this._wsSendMessage({});
+ this._lastPingTime = Date.now();
+ } catch (e) {
+ console.debug("sendPing: Error sending ping", e);
+ this._reconnect();
+ }
+ },
+
+ /** Times out any pending requests. */
+ _timeOutRequests() {
+ console.debug("timeOutRequests()");
+
+ if (!this._hasPendingRequests()) {
+ // Cancel the repeating timer and exit early if we aren't waiting for
+ // pongs or requests.
+ this._requestTimeoutTimer.cancel();
+ return;
+ }
+
+ let now = Date.now();
+
+ // Set to true if at least one request timed out, or we're still waiting
+ // for a pong after the request timeout.
+ let requestTimedOut = false;
+
+ if (
+ this._lastPingTime > 0 &&
+ now - this._lastPingTime > this._requestTimeout
+ ) {
+ console.debug("timeOutRequests: Did not receive pong in time");
+ requestTimedOut = true;
+ } else {
+ for (let [key, request] of this._pendingRequests) {
+ let duration = now - request.ctime;
+ // If any of the registration requests time out, all the ones after it
+ // also made to fail, since we are going to be disconnecting the
+ // socket.
+ requestTimedOut |= duration > this._requestTimeout;
+ if (requestTimedOut) {
+ request.reject(new Error("Request timed out: " + key));
+ this._pendingRequests.delete(key);
+ }
+ }
+ }
+
+ // The most likely reason for a pong or registration request timing out is
+ // that the socket has disconnected. Best to reconnect.
+ if (requestTimedOut) {
+ this._reconnect();
+ }
+ },
+
+ get _UAID() {
+ return prefs.getStringPref("userAgentID");
+ },
+
+ set _UAID(newID) {
+ if (typeof newID !== "string") {
+ console.warn(
+ "Got invalid, non-string UAID",
+ newID,
+ "Not updating userAgentID"
+ );
+ return;
+ }
+ console.debug("New _UAID", newID);
+ prefs.setStringPref("userAgentID", newID);
+ },
+
+ _ws: null,
+ _pendingRequests: new Map(),
+ _currentState: STATE_SHUT_DOWN,
+ _requestTimeout: 0,
+ _requestTimeoutTimer: null,
+ _retryFailCount: 0,
+
+ /**
+ * According to the WS spec, servers should immediately close the underlying
+ * TCP connection after they close a WebSocket. This causes wsOnStop to be
+ * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
+ * WebSocket up, it should try to reconnect. But if the server closes the
+ * WebSocket because it wants the client to back off, then the client
+ * shouldn't re-establish the connection. If the server sends the backoff
+ * close code, this field will be set to true in wsOnServerClose. It is
+ * checked in wsOnStop.
+ */
+ _skipReconnect: false,
+
+ /** Indicates whether the server supports Web Push-style message delivery. */
+ _dataEnabled: false,
+
+ /**
+ * The last time the client sent a ping to the server. If non-zero, keeps the
+ * request timeout timer active. Reset to zero when the server responds with
+ * a pong or pending messages.
+ */
+ _lastPingTime: 0,
+
+ /**
+ * A one-shot timer used to ping the server, to avoid timing out idle
+ * connections. Reset to the ping interval on each incoming message.
+ */
+ _pingTimer: null,
+
+ /** A one-shot timer fired after the reconnect backoff period. */
+ _backoffTimer: null,
+
+ /**
+ * Sends a message to the Push Server through an open websocket.
+ * typeof(msg) shall be an object
+ */
+ _wsSendMessage(msg) {
+ if (!this._ws) {
+ console.warn(
+ "wsSendMessage: No WebSocket initialized.",
+ "Cannot send a message"
+ );
+ return;
+ }
+ msg = JSON.stringify(msg);
+ console.debug("wsSendMessage: Sending message", msg);
+ this._ws.sendMsg(msg);
+ },
+
+ init(options, mainPushService, serverURI) {
+ console.debug("init()");
+
+ this._mainPushService = mainPushService;
+ this._serverURI = serverURI;
+ // Filled in at connect() time
+ this._broadcastListeners = null;
+
+ // Override the default WebSocket factory function. The returned object
+ // must be null or satisfy the nsIWebSocketChannel interface. Used by
+ // the tests to provide a mock WebSocket implementation.
+ if (options.makeWebSocket) {
+ this._makeWebSocket = options.makeWebSocket;
+ }
+
+ this._requestTimeout = prefs.getIntPref("requestTimeout");
+
+ return Promise.resolve();
+ },
+
+ _reconnect() {
+ console.debug("reconnect()");
+ this._shutdownWS(false);
+ this._startBackoffTimer();
+ },
+
+ _shutdownWS(shouldCancelPending = true) {
+ console.debug("shutdownWS()");
+
+ if (this._currentState == STATE_READY) {
+ prefs.removeObserver("userAgentID", this);
+ }
+
+ this._currentState = STATE_SHUT_DOWN;
+ this._skipReconnect = false;
+
+ if (this._wsListener) {
+ this._wsListener._pushService = null;
+ }
+ try {
+ this._ws.close(0, null);
+ } catch (e) {}
+ this._ws = null;
+
+ this._lastPingTime = 0;
+
+ if (this._pingTimer) {
+ this._pingTimer.cancel();
+ }
+
+ if (shouldCancelPending) {
+ this._cancelPendingRequests();
+ }
+
+ if (this._notifyRequestQueue) {
+ this._notifyRequestQueue();
+ this._notifyRequestQueue = null;
+ }
+ },
+
+ uninit() {
+ // All pending requests (ideally none) are dropped at this point. We
+ // shouldn't have any applications performing registration/unregistration
+ // or receiving notifications.
+ this._shutdownWS();
+
+ if (this._backoffTimer) {
+ this._backoffTimer.cancel();
+ }
+ if (this._requestTimeoutTimer) {
+ this._requestTimeoutTimer.cancel();
+ }
+
+ this._mainPushService = null;
+
+ this._dataEnabled = false;
+ },
+
+ /**
+ * How retries work: If the WS is closed due to a socket error,
+ * _startBackoffTimer() is called. The retry timer is started and when
+ * it times out, beginWSSetup() is called again.
+ *
+ * If we are in the middle of a timeout (i.e. waiting), but
+ * a register/unregister is called, we don't want to wait around anymore.
+ * _sendRequest will automatically call beginWSSetup(), which will cancel the
+ * timer. In addition since the state will have changed, even if a pending
+ * timer event comes in (because the timer fired the event before it was
+ * cancelled), so the connection won't be reset.
+ */
+ _startBackoffTimer() {
+ console.debug("startBackoffTimer()");
+
+ // Calculate new timeout, but cap it to pingInterval.
+ let retryTimeout =
+ prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
+ retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
+
+ this._retryFailCount++;
+
+ console.debug(
+ "startBackoffTimer: Retry in",
+ retryTimeout,
+ "Try number",
+ this._retryFailCount
+ );
+
+ if (!this._backoffTimer) {
+ this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
+ Ci.nsITimer
+ );
+ }
+ this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
+ },
+
+ /** Indicates whether we're waiting for pongs or requests. */
+ _hasPendingRequests() {
+ return this._lastPingTime > 0 || this._pendingRequests.size > 0;
+ },
+
+ /**
+ * Starts the request timeout timer unless we're already waiting for a pong
+ * or register request.
+ */
+ _startRequestTimeoutTimer() {
+ if (this._hasPendingRequests()) {
+ return;
+ }
+ if (!this._requestTimeoutTimer) {
+ this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
+ Ci.nsITimer
+ );
+ }
+ this._requestTimeoutTimer.init(
+ this,
+ this._requestTimeout,
+ Ci.nsITimer.TYPE_REPEATING_SLACK
+ );
+ },
+
+ /** Starts or resets the ping timer. */
+ _startPingTimer() {
+ if (!this._pingTimer) {
+ this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
+ }
+ this._pingTimer.init(
+ this,
+ prefs.getIntPref("pingInterval"),
+ Ci.nsITimer.TYPE_ONE_SHOT
+ );
+ },
+
+ _makeWebSocket(uri) {
+ if (!prefs.getBoolPref("connection.enabled")) {
+ console.warn(
+ "makeWebSocket: connection.enabled is not set to true.",
+ "Aborting."
+ );
+ return null;
+ }
+ if (Services.io.offline) {
+ console.warn("makeWebSocket: Network is offline.");
+ return null;
+ }
+ let contractId =
+ uri.scheme == "ws"
+ ? "@mozilla.org/network/protocol;1?name=ws"
+ : "@mozilla.org/network/protocol;1?name=wss";
+ let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
+
+ socket.initLoadInfo(
+ null, // aLoadingNode
+ Services.scriptSecurityManager.getSystemPrincipal(),
+ null, // aTriggeringPrincipal
+ Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
+ Ci.nsIContentPolicy.TYPE_WEBSOCKET
+ );
+ // Allow deprecated HTTP request from SystemPrincipal
+ socket.loadInfo.allowDeprecatedSystemRequests = true;
+
+ return socket;
+ },
+
+ _beginWSSetup() {
+ console.debug("beginWSSetup()");
+ if (this._currentState != STATE_SHUT_DOWN) {
+ console.error(
+ "_beginWSSetup: Not in shutdown state! Current state",
+ this._currentState
+ );
+ return;
+ }
+
+ // Stop any pending reconnects scheduled for the near future.
+ if (this._backoffTimer) {
+ this._backoffTimer.cancel();
+ }
+
+ let uri = this._serverURI;
+ if (!uri) {
+ return;
+ }
+ let socket = this._makeWebSocket(uri);
+ if (!socket) {
+ return;
+ }
+ this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
+
+ console.debug("beginWSSetup: Connecting to", uri.spec);
+ this._wsListener = new PushWebSocketListener(this);
+ this._ws.protocol = "push-notification";
+
+ try {
+ // Grab a wakelock before we open the socket to ensure we don't go to
+ // sleep before connection the is opened.
+ this._ws.asyncOpen(uri, uri.spec, 0, this._wsListener, null);
+ this._currentState = STATE_WAITING_FOR_WS_START;
+ } catch (e) {
+ console.error(
+ "beginWSSetup: Error opening websocket.",
+ "asyncOpen failed",
+ e
+ );
+ this._reconnect();
+ }
+ },
+
+ connect(broadcastListeners) {
+ console.debug("connect()", broadcastListeners);
+ this._broadcastListeners = broadcastListeners;
+ this._beginWSSetup();
+ },
+
+ isConnected() {
+ return !!this._ws;
+ },
+
+ /**
+ * Protocol handler invoked by server message.
+ */
+ _handleHelloReply(reply) {
+ console.debug("handleHelloReply()");
+ if (this._currentState != STATE_WAITING_FOR_HELLO) {
+ console.error(
+ "handleHelloReply: Unexpected state",
+ this._currentState,
+ "(expected STATE_WAITING_FOR_HELLO)"
+ );
+ this._shutdownWS();
+ return;
+ }
+
+ if (typeof reply.uaid !== "string") {
+ console.error("handleHelloReply: Received invalid UAID", reply.uaid);
+ this._shutdownWS();
+ return;
+ }
+
+ if (reply.uaid === "") {
+ console.error("handleHelloReply: Received empty UAID");
+ this._shutdownWS();
+ return;
+ }
+
+ // To avoid sticking extra large values sent by an evil server into prefs.
+ if (reply.uaid.length > 128) {
+ console.error(
+ "handleHelloReply: UAID received from server was too long",
+ reply.uaid
+ );
+ this._shutdownWS();
+ return;
+ }
+
+ let sendRequests = () => {
+ if (this._notifyRequestQueue) {
+ this._notifyRequestQueue();
+ this._notifyRequestQueue = null;
+ }
+ this._sendPendingRequests();
+ };
+
+ function finishHandshake() {
+ this._UAID = reply.uaid;
+ this._currentState = STATE_READY;
+ prefs.addObserver("userAgentID", this);
+
+ // Handle broadcasts received in response to the "hello" message.
+ if (!ObjectUtils.isEmpty(reply.broadcasts)) {
+ // The reply isn't technically a broadcast message, but it has
+ // the shape of a broadcast message (it has a broadcasts field).
+ const context = { phase: pushBroadcastService.PHASES.HELLO };
+ this._mainPushService.receivedBroadcastMessage(reply, context);
+ }
+
+ this._dataEnabled = !!reply.use_webpush;
+ if (this._dataEnabled) {
+ this._mainPushService
+ .getAllUnexpired()
+ .then(records =>
+ Promise.all(
+ records.map(record =>
+ this._mainPushService.ensureCrypto(record).catch(error => {
+ console.error(
+ "finishHandshake: Error updating record",
+ record.keyID,
+ error
+ );
+ })
+ )
+ )
+ )
+ .then(sendRequests);
+ } else {
+ sendRequests();
+ }
+ }
+
+ // By this point we've got a UAID from the server that we are ready to
+ // accept.
+ //
+ // We unconditionally drop all existing registrations and notify service
+ // workers if we receive a new UAID. This ensures we expunge all stale
+ // registrations if the `userAgentID` pref is reset.
+ if (this._UAID != reply.uaid) {
+ console.debug("handleHelloReply: Received new UAID");
+
+ this._mainPushService
+ .dropUnexpiredRegistrations()
+ .then(finishHandshake.bind(this));
+
+ return;
+ }
+
+ // otherwise we are good to go
+ finishHandshake.bind(this)();
+ },
+
+ /**
+ * Protocol handler invoked by server message.
+ */
+ _handleRegisterReply(reply) {
+ console.debug("handleRegisterReply()");
+
+ let tmp = this._takeRequestForReply(reply);
+ if (!tmp) {
+ return;
+ }
+
+ if (reply.status == 200) {
+ try {
+ Services.io.newURI(reply.pushEndpoint);
+ } catch (e) {
+ tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
+ return;
+ }
+
+ let record = new PushRecordWebSocket({
+ channelID: reply.channelID,
+ pushEndpoint: reply.pushEndpoint,
+ scope: tmp.record.scope,
+ originAttributes: tmp.record.originAttributes,
+ version: null,
+ systemRecord: tmp.record.systemRecord,
+ appServerKey: tmp.record.appServerKey,
+ ctime: Date.now(),
+ });
+ tmp.resolve(record);
+ } else {
+ console.error("handleRegisterReply: Unexpected server response", reply);
+ tmp.reject(
+ new Error("Wrong status code for register reply: " + reply.status)
+ );
+ }
+ },
+
+ _handleUnregisterReply(reply) {
+ console.debug("handleUnregisterReply()");
+
+ let request = this._takeRequestForReply(reply);
+ if (!request) {
+ return;
+ }
+
+ let success = reply.status === 200;
+ request.resolve(success);
+ },
+
+ _handleDataUpdate(update) {
+ let promise;
+ if (typeof update.channelID != "string") {
+ console.warn(
+ "handleDataUpdate: Discarding update without channel ID",
+ update
+ );
+ return;
+ }
+ function updateRecord(record) {
+ // Ignore messages that we've already processed. This can happen if the
+ // connection drops between notifying the service worker and acking the
+ // the message. In that case, the server will re-send the message on
+ // reconnect.
+ if (record.hasRecentMessageID(update.version)) {
+ console.warn(
+ "handleDataUpdate: Ignoring duplicate message",
+ update.version
+ );
+ return null;
+ }
+ record.noteRecentMessageID(update.version);
+ return record;
+ }
+ if (typeof update.data != "string") {
+ promise = this._mainPushService.receivedPushMessage(
+ update.channelID,
+ update.version,
+ null,
+ null,
+ updateRecord
+ );
+ } else {
+ let message = ChromeUtils.base64URLDecode(update.data, {
+ // The Push server may append padding.
+ padding: "ignore",
+ });
+ promise = this._mainPushService.receivedPushMessage(
+ update.channelID,
+ update.version,
+ update.headers,
+ message,
+ updateRecord
+ );
+ }
+ promise
+ .then(
+ status => {
+ this._sendAck(update.channelID, update.version, status);
+ },
+ err => {
+ console.error(
+ "handleDataUpdate: Error delivering message",
+ update,
+ err
+ );
+ this._sendAck(
+ update.channelID,
+ update.version,
+ Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
+ );
+ }
+ )
+ .catch(err => {
+ console.error(
+ "handleDataUpdate: Error acknowledging message",
+ update,
+ err
+ );
+ });
+ },
+
+ /**
+ * Protocol handler invoked by server message.
+ */
+ _handleNotificationReply(reply) {
+ console.debug("handleNotificationReply()");
+ if (this._dataEnabled) {
+ this._handleDataUpdate(reply);
+ return;
+ }
+
+ if (typeof reply.updates !== "object") {
+ console.warn("handleNotificationReply: Missing updates", reply.updates);
+ return;
+ }
+
+ console.debug("handleNotificationReply: Got updates", reply.updates);
+ for (let i = 0; i < reply.updates.length; i++) {
+ let update = reply.updates[i];
+ console.debug("handleNotificationReply: Handling update", update);
+ if (typeof update.channelID !== "string") {
+ console.debug(
+ "handleNotificationReply: Invalid update at index",
+ i,
+ update
+ );
+ continue;
+ }
+
+ if (update.version === undefined) {
+ console.debug("handleNotificationReply: Missing version", update);
+ continue;
+ }
+
+ let version = update.version;
+
+ if (typeof version === "string") {
+ version = parseInt(version, 10);
+ }
+
+ if (typeof version === "number" && version >= 0) {
+ // FIXME(nsm): this relies on app update notification being infallible!
+ // eventually fix this
+ this._receivedUpdate(update.channelID, version);
+ }
+ }
+ },
+
+ _handleBroadcastReply(reply) {
+ let phase = pushBroadcastService.PHASES.BROADCAST;
+ // Check if this reply is the result of registration.
+ for (const id of Object.keys(reply.broadcasts)) {
+ const wasRegistering = this._currentlyRegistering.delete(id);
+ if (wasRegistering) {
+ // If we get multiple broadcasts and only one is "registering",
+ // then we consider the phase to be REGISTER for all of them.
+ // It is acceptable since registrations do not happen so often,
+ // and are all very likely to occur soon after browser startup.
+ phase = pushBroadcastService.PHASES.REGISTER;
+ }
+ }
+ const context = { phase };
+ this._mainPushService.receivedBroadcastMessage(reply, context);
+ },
+
+ reportDeliveryError(messageID, reason) {
+ console.debug("reportDeliveryError()");
+ let code = kDELIVERY_REASON_TO_CODE[reason];
+ if (!code) {
+ throw new Error("Invalid delivery error reason");
+ }
+ let data = { messageType: "nack", version: messageID, code };
+ this._queueRequest(data);
+ },
+
+ _sendAck(channelID, version, status) {
+ console.debug("sendAck()");
+ let code = kACK_STATUS_TO_CODE[status];
+ if (!code) {
+ throw new Error("Invalid ack status");
+ }
+ let data = { messageType: "ack", updates: [{ channelID, version, code }] };
+ this._queueRequest(data);
+ },
+
+ _generateID() {
+ let uuidGenerator = Cc["@mozilla.org/uuid-generator;1"].getService(
+ Ci.nsIUUIDGenerator
+ );
+ // generateUUID() gives a UUID surrounded by {...}, slice them off.
+ return uuidGenerator
+ .generateUUID()
+ .toString()
+ .slice(1, -1);
+ },
+
+ register(record) {
+ console.debug("register() ", record);
+
+ let data = { channelID: this._generateID(), messageType: "register" };
+
+ if (record.appServerKey) {
+ data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
+ // The Push server requires padding.
+ pad: true,
+ });
+ }
+
+ return this._sendRequestForReply(record, data).then(record => {
+ if (!this._dataEnabled) {
+ return record;
+ }
+ return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
+ record.p256dhPublicKey = publicKey;
+ record.p256dhPrivateKey = privateKey;
+ record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
+ return record;
+ });
+ });
+ },
+
+ unregister(record, reason) {
+ console.debug("unregister() ", record, reason);
+
+ return Promise.resolve().then(_ => {
+ let code = kUNREGISTER_REASON_TO_CODE[reason];
+ if (!code) {
+ throw new Error("Invalid unregister reason");
+ }
+ let data = {
+ channelID: record.channelID,
+ messageType: "unregister",
+ code,
+ };
+
+ return this._sendRequestForReply(record, data);
+ });
+ },
+
+ _queueStart: Promise.resolve(),
+ _notifyRequestQueue: null,
+ _queue: null,
+ _enqueue(op) {
+ console.debug("enqueue()");
+ if (!this._queue) {
+ this._queue = this._queueStart;
+ }
+ this._queue = this._queue.then(op).catch(_ => {});
+ },
+
+ /** Sends a request to the server. */
+ _send(data) {
+ if (this._currentState != STATE_READY) {
+ console.warn(
+ "send: Unexpected state; ignoring message",
+ this._currentState
+ );
+ return;
+ }
+ if (!this._requestHasReply(data)) {
+ this._wsSendMessage(data);
+ return;
+ }
+ // If we're expecting a reply, check that we haven't cancelled the request.
+ let key = this._makePendingRequestKey(data);
+ if (!this._pendingRequests.has(key)) {
+ console.log("send: Request cancelled; ignoring message", key);
+ return;
+ }
+ this._wsSendMessage(data);
+ },
+
+ /** Indicates whether a request has a corresponding reply from the server. */
+ _requestHasReply(data) {
+ return data.messageType == "register" || data.messageType == "unregister";
+ },
+
+ /**
+ * Sends all pending requests that expect replies. Called after the connection
+ * is established and the handshake is complete.
+ */
+ _sendPendingRequests() {
+ this._enqueue(_ => {
+ for (let request of this._pendingRequests.values()) {
+ this._send(request.data);
+ }
+ });
+ },
+
+ /** Queues an outgoing request, establishing a connection if necessary. */
+ _queueRequest(data) {
+ console.debug("queueRequest()", data);
+
+ if (this._currentState == STATE_READY) {
+ // If we're ready, no need to queue; just send the request.
+ this._send(data);
+ return;
+ }
+
+ // Otherwise, we're still setting up. If we don't have a request queue,
+ // make one now.
+ if (!this._notifyRequestQueue) {
+ let promise = new Promise((resolve, reject) => {
+ this._notifyRequestQueue = resolve;
+ });
+ this._enqueue(_ => promise);
+ }
+
+ let isRequest = this._requestHasReply(data);
+ if (!isRequest) {
+ // Don't queue requests, since they're stored in `_pendingRequests`, and
+ // `_sendPendingRequests` will send them after reconnecting. Without this
+ // check, we'd send requests twice.
+ this._enqueue(_ => this._send(data));
+ }
+
+ if (!this._ws) {
+ // This will end up calling notifyRequestQueue().
+ this._beginWSSetup();
+ // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
+ // not be call.
+ if (!this._ws && this._notifyRequestQueue) {
+ this._notifyRequestQueue();
+ this._notifyRequestQueue = null;
+ }
+ }
+ },
+
+ _receivedUpdate(aChannelID, aLatestVersion) {
+ console.debug("receivedUpdate: Updating", aChannelID, "->", aLatestVersion);
+
+ this._mainPushService
+ .receivedPushMessage(aChannelID, "", null, null, record => {
+ if (record.version === null || record.version < aLatestVersion) {
+ console.debug(
+ "receivedUpdate: Version changed for",
+ aChannelID,
+ aLatestVersion
+ );
+ record.version = aLatestVersion;
+ return record;
+ }
+ console.debug(
+ "receivedUpdate: No significant version change for",
+ aChannelID,
+ aLatestVersion
+ );
+ return null;
+ })
+ .then(status => {
+ this._sendAck(aChannelID, aLatestVersion, status);
+ })
+ .catch(err => {
+ console.error(
+ "receivedUpdate: Error acknowledging message",
+ aChannelID,
+ aLatestVersion,
+ err
+ );
+ });
+ },
+
+ // begin Push protocol handshake
+ _wsOnStart(context) {
+ console.debug("wsOnStart()");
+
+ if (this._currentState != STATE_WAITING_FOR_WS_START) {
+ console.error(
+ "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
+ "state",
+ this._currentState,
+ "Skipping"
+ );
+ return;
+ }
+
+ this._mainPushService
+ .getAllUnexpired()
+ .then(
+ records => this._sendHello(records),
+ err => {
+ console.warn(
+ "Error fetching existing records before handshake; assuming none",
+ err
+ );
+ this._sendHello([]);
+ }
+ )
+ .catch(err => {
+ // If we failed to send the handshake, back off and reconnect.
+ console.warn("Failed to send handshake; reconnecting", err);
+ this._reconnect();
+ });
+ },
+
+ /**
+ * Sends a `hello` handshake to the server.
+ *
+ * @param {Array<PushRecordWebSocket>} An array of records for existing
+ * subscriptions, used to determine whether to rotate our UAID.
+ */
+ _sendHello(records) {
+ let data = {
+ messageType: "hello",
+ broadcasts: this._broadcastListeners,
+ use_webpush: true,
+ };
+
+ if (records.length && this._UAID) {
+ // Only send our UAID if we have existing push subscriptions, to
+ // avoid tying a persistent identifier to the connection (bug
+ // 1617136). The push server will issue our client a new UAID in
+ // the `hello` response, which we'll store until either the next
+ // time we reconnect, or the user subscribes to push. Once we have a
+ // push subscription, we'll stop rotating the UAID when we connect,
+ // so that we can receive push messages for them.
+ data.uaid = this._UAID;
+ }
+
+ this._wsSendMessage(data);
+ this._currentState = STATE_WAITING_FOR_HELLO;
+ },
+
+ /**
+ * This statusCode is not the websocket protocol status code, but the TCP
+ * connection close status code.
+ *
+ * If we do not explicitly call ws.close() then statusCode is always
+ * NS_BASE_STREAM_CLOSED, even on a successful close.
+ */
+ _wsOnStop(context, statusCode) {
+ console.debug("wsOnStop()");
+
+ if (statusCode != Cr.NS_OK && !this._skipReconnect) {
+ console.debug("wsOnStop: Reconnecting after socket error", statusCode);
+ this._reconnect();
+ return;
+ }
+
+ this._shutdownWS();
+ },
+
+ _wsOnMessageAvailable(context, message) {
+ console.debug("wsOnMessageAvailable()", message);
+
+ // Clearing the last ping time indicates we're no longer waiting for a pong.
+ this._lastPingTime = 0;
+
+ let reply;
+ try {
+ reply = JSON.parse(message);
+ } catch (e) {
+ console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
+ return;
+ }
+
+ // If we receive a message, we know the connection succeeded. Reset the
+ // connection attempt and ping interval counters.
+ this._retryFailCount = 0;
+
+ let doNotHandle = false;
+ if (
+ message === "{}" ||
+ reply.messageType === undefined ||
+ reply.messageType === "ping" ||
+ typeof reply.messageType != "string"
+ ) {
+ console.debug("wsOnMessageAvailable: Pong received");
+ doNotHandle = true;
+ }
+
+ // Reset the ping timer. Note: This path is executed at every step of the
+ // handshake, so this timer does not need to be set explicitly at startup.
+ this._startPingTimer();
+
+ // If it is a ping, do not handle the message.
+ if (doNotHandle) {
+ return;
+ }
+
+ // A whitelist of protocol handlers. Add to these if new messages are added
+ // in the protocol.
+ let handlers = [
+ "Hello",
+ "Register",
+ "Unregister",
+ "Notification",
+ "Broadcast",
+ ];
+
+ // Build up the handler name to call from messageType.
+ // e.g. messageType == "register" -> _handleRegisterReply.
+ let handlerName =
+ reply.messageType[0].toUpperCase() +
+ reply.messageType.slice(1).toLowerCase();
+
+ if (!handlers.includes(handlerName)) {
+ console.warn(
+ "wsOnMessageAvailable: No whitelisted handler",
+ handlerName,
+ "for message",
+ reply.messageType
+ );
+ return;
+ }
+
+ let handler = "_handle" + handlerName + "Reply";
+
+ if (typeof this[handler] !== "function") {
+ console.warn(
+ "wsOnMessageAvailable: Handler",
+ handler,
+ "whitelisted but not implemented"
+ );
+ return;
+ }
+
+ this[handler](reply);
+ },
+
+ /**
+ * The websocket should never be closed. Since we don't call ws.close(),
+ * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
+ * function), which calls reconnect and re-establishes the WebSocket
+ * connection.
+ *
+ * If the server requested that we back off, we won't reconnect until the
+ * next network state change event, or until we need to send a new register
+ * request.
+ */
+ _wsOnServerClose(context, aStatusCode, aReason) {
+ console.debug("wsOnServerClose()", aStatusCode, aReason);
+
+ if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
+ console.debug("wsOnServerClose: Skipping automatic reconnect");
+ this._skipReconnect = true;
+ }
+ },
+
+ /**
+ * Rejects all pending register requests with errors.
+ */
+ _cancelPendingRequests() {
+ for (let request of this._pendingRequests.values()) {
+ request.reject(new Error("Request aborted"));
+ }
+ this._pendingRequests.clear();
+ },
+
+ /** Creates a case-insensitive map key for a request that expects a reply. */
+ _makePendingRequestKey(data) {
+ return (data.messageType + "|" + data.channelID).toLowerCase();
+ },
+
+ /** Sends a request and waits for a reply from the server. */
+ _sendRequestForReply(record, data) {
+ return Promise.resolve().then(_ => {
+ // start the timer since we now have at least one request
+ this._startRequestTimeoutTimer();
+
+ let key = this._makePendingRequestKey(data);
+ if (!this._pendingRequests.has(key)) {
+ let request = {
+ data,
+ record,
+ ctime: Date.now(),
+ };
+ request.promise = new Promise((resolve, reject) => {
+ request.resolve = resolve;
+ request.reject = reject;
+ });
+ this._pendingRequests.set(key, request);
+ this._queueRequest(data);
+ }
+
+ return this._pendingRequests.get(key).promise;
+ });
+ },
+
+ /** Removes and returns a pending request for a server reply. */
+ _takeRequestForReply(reply) {
+ if (typeof reply.channelID !== "string") {
+ return null;
+ }
+ let key = this._makePendingRequestKey(reply);
+ let request = this._pendingRequests.get(key);
+ if (!request) {
+ return null;
+ }
+ this._pendingRequests.delete(key);
+ if (!this._hasPendingRequests()) {
+ this._requestTimeoutTimer.cancel();
+ }
+ return request;
+ },
+
+ sendSubscribeBroadcast(serviceId, version) {
+ this._currentlyRegistering.add(serviceId);
+ let data = {
+ messageType: "broadcast_subscribe",
+ broadcasts: {
+ [serviceId]: version,
+ },
+ };
+
+ this._queueRequest(data);
+ },
+};
+
+function PushRecordWebSocket(record) {
+ PushRecord.call(this, record);
+ this.channelID = record.channelID;
+ this.version = record.version;
+}
+
+PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
+ keyID: {
+ get() {
+ return this.channelID;
+ },
+ },
+});
+
+PushRecordWebSocket.prototype.toSubscription = function() {
+ let subscription = PushRecord.prototype.toSubscription.call(this);
+ subscription.version = this.version;
+ return subscription;
+};