diff options
Diffstat (limited to 'dom/push/PushServiceWebSocket.jsm')
-rw-r--r-- | dom/push/PushServiceWebSocket.jsm | 1311 |
1 files changed, 1311 insertions, 0 deletions
diff --git a/dom/push/PushServiceWebSocket.jsm b/dom/push/PushServiceWebSocket.jsm new file mode 100644 index 0000000000..7dd8b347a2 --- /dev/null +++ b/dom/push/PushServiceWebSocket.jsm @@ -0,0 +1,1311 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm"); +const { XPCOMUtils } = ChromeUtils.import( + "resource://gre/modules/XPCOMUtils.jsm" +); + +const { PushDB } = ChromeUtils.import("resource://gre/modules/PushDB.jsm"); +const { PushRecord } = ChromeUtils.import( + "resource://gre/modules/PushRecord.jsm" +); +const { PushCrypto } = ChromeUtils.import( + "resource://gre/modules/PushCrypto.jsm" +); + +ChromeUtils.defineModuleGetter( + this, + "pushBroadcastService", + "resource://gre/modules/PushBroadcastService.jsm" +); +ChromeUtils.defineModuleGetter( + this, + "ObjectUtils", + "resource://gre/modules/ObjectUtils.jsm" +); + +const kPUSHWSDB_DB_NAME = "pushapi"; +const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes +const kPUSHWSDB_STORE_NAME = "pushapi"; + +// WebSocket close code sent by the server to indicate that the client should +// not automatically reconnect. +const kBACKOFF_WS_STATUS_CODE = 4774; + +// Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes +// included in request payloads. +const kACK_STATUS_TO_CODE = { + [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100, + [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101, + [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102, +}; + +const kUNREGISTER_REASON_TO_CODE = { + [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200, + [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201, + [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202, +}; + +const kDELIVERY_REASON_TO_CODE = { + [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301, + [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302, + [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303, +}; + +const prefs = Services.prefs.getBranch("dom.push."); + +const EXPORTED_SYMBOLS = ["PushServiceWebSocket"]; + +XPCOMUtils.defineLazyGetter(this, "console", () => { + let { ConsoleAPI } = ChromeUtils.import("resource://gre/modules/Console.jsm"); + return new ConsoleAPI({ + maxLogLevelPref: "dom.push.loglevel", + prefix: "PushServiceWebSocket", + }); +}); + +/** + * A proxy between the PushService and the WebSocket. The listener is used so + * that the PushService can silence messages from the WebSocket by setting + * PushWebSocketListener._pushService to null. This is required because + * a WebSocket can continue to send messages or errors after it has been + * closed but the PushService may not be interested in these. It's easier to + * stop listening than to have checks at specific points. + */ +var PushWebSocketListener = function(pushService) { + this._pushService = pushService; +}; + +PushWebSocketListener.prototype = { + onStart(context) { + if (!this._pushService) { + return; + } + this._pushService._wsOnStart(context); + }, + + onStop(context, statusCode) { + if (!this._pushService) { + return; + } + this._pushService._wsOnStop(context, statusCode); + }, + + onAcknowledge(context, size) { + // EMPTY + }, + + onBinaryMessageAvailable(context, message) { + // EMPTY + }, + + onMessageAvailable(context, message) { + if (!this._pushService) { + return; + } + this._pushService._wsOnMessageAvailable(context, message); + }, + + onServerClose(context, aStatusCode, aReason) { + if (!this._pushService) { + return; + } + this._pushService._wsOnServerClose(context, aStatusCode, aReason); + }, +}; + +// websocket states +// websocket is off +const STATE_SHUT_DOWN = 0; +// Websocket has been opened on client side, waiting for successful open. +// (_wsOnStart) +const STATE_WAITING_FOR_WS_START = 1; +// Websocket opened, hello sent, waiting for server reply (_handleHelloReply). +const STATE_WAITING_FOR_HELLO = 2; +// Websocket operational, handshake completed, begin protocol messaging. +const STATE_READY = 3; + +var PushServiceWebSocket = { + _mainPushService: null, + _serverURI: null, + _currentlyRegistering: new Set(), + + newPushDB() { + return new PushDB( + kPUSHWSDB_DB_NAME, + kPUSHWSDB_DB_VERSION, + kPUSHWSDB_STORE_NAME, + "channelID", + PushRecordWebSocket + ); + }, + + disconnect() { + this._shutdownWS(); + }, + + observe(aSubject, aTopic, aData) { + if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") { + this._onUAIDChanged(); + } else if (aTopic == "timer-callback") { + this._onTimerFired(aSubject); + } + }, + + /** + * Handles a UAID change. Unlike reconnects, we cancel all pending requests + * after disconnecting. Existing subscriptions stored in IndexedDB will be + * dropped on reconnect. + */ + _onUAIDChanged() { + console.debug("onUAIDChanged()"); + + this._shutdownWS(); + this._startBackoffTimer(); + }, + + /** Handles a ping, backoff, or request timeout timer event. */ + _onTimerFired(timer) { + console.debug("onTimerFired()"); + + if (timer == this._pingTimer) { + this._sendPing(); + return; + } + + if (timer == this._backoffTimer) { + console.debug("onTimerFired: Reconnecting after backoff"); + this._beginWSSetup(); + return; + } + + if (timer == this._requestTimeoutTimer) { + this._timeOutRequests(); + } + }, + + /** + * Sends a ping to the server. Bypasses the request queue, but starts the + * request timeout timer. If the socket is already closed, or the server + * does not respond within the timeout, the client will reconnect. + */ + _sendPing() { + console.debug("sendPing()"); + + this._startRequestTimeoutTimer(); + try { + this._wsSendMessage({}); + this._lastPingTime = Date.now(); + } catch (e) { + console.debug("sendPing: Error sending ping", e); + this._reconnect(); + } + }, + + /** Times out any pending requests. */ + _timeOutRequests() { + console.debug("timeOutRequests()"); + + if (!this._hasPendingRequests()) { + // Cancel the repeating timer and exit early if we aren't waiting for + // pongs or requests. + this._requestTimeoutTimer.cancel(); + return; + } + + let now = Date.now(); + + // Set to true if at least one request timed out, or we're still waiting + // for a pong after the request timeout. + let requestTimedOut = false; + + if ( + this._lastPingTime > 0 && + now - this._lastPingTime > this._requestTimeout + ) { + console.debug("timeOutRequests: Did not receive pong in time"); + requestTimedOut = true; + } else { + for (let [key, request] of this._pendingRequests) { + let duration = now - request.ctime; + // If any of the registration requests time out, all the ones after it + // also made to fail, since we are going to be disconnecting the + // socket. + requestTimedOut |= duration > this._requestTimeout; + if (requestTimedOut) { + request.reject(new Error("Request timed out: " + key)); + this._pendingRequests.delete(key); + } + } + } + + // The most likely reason for a pong or registration request timing out is + // that the socket has disconnected. Best to reconnect. + if (requestTimedOut) { + this._reconnect(); + } + }, + + get _UAID() { + return prefs.getStringPref("userAgentID"); + }, + + set _UAID(newID) { + if (typeof newID !== "string") { + console.warn( + "Got invalid, non-string UAID", + newID, + "Not updating userAgentID" + ); + return; + } + console.debug("New _UAID", newID); + prefs.setStringPref("userAgentID", newID); + }, + + _ws: null, + _pendingRequests: new Map(), + _currentState: STATE_SHUT_DOWN, + _requestTimeout: 0, + _requestTimeoutTimer: null, + _retryFailCount: 0, + + /** + * According to the WS spec, servers should immediately close the underlying + * TCP connection after they close a WebSocket. This causes wsOnStop to be + * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the + * WebSocket up, it should try to reconnect. But if the server closes the + * WebSocket because it wants the client to back off, then the client + * shouldn't re-establish the connection. If the server sends the backoff + * close code, this field will be set to true in wsOnServerClose. It is + * checked in wsOnStop. + */ + _skipReconnect: false, + + /** Indicates whether the server supports Web Push-style message delivery. */ + _dataEnabled: false, + + /** + * The last time the client sent a ping to the server. If non-zero, keeps the + * request timeout timer active. Reset to zero when the server responds with + * a pong or pending messages. + */ + _lastPingTime: 0, + + /** + * A one-shot timer used to ping the server, to avoid timing out idle + * connections. Reset to the ping interval on each incoming message. + */ + _pingTimer: null, + + /** A one-shot timer fired after the reconnect backoff period. */ + _backoffTimer: null, + + /** + * Sends a message to the Push Server through an open websocket. + * typeof(msg) shall be an object + */ + _wsSendMessage(msg) { + if (!this._ws) { + console.warn( + "wsSendMessage: No WebSocket initialized.", + "Cannot send a message" + ); + return; + } + msg = JSON.stringify(msg); + console.debug("wsSendMessage: Sending message", msg); + this._ws.sendMsg(msg); + }, + + init(options, mainPushService, serverURI) { + console.debug("init()"); + + this._mainPushService = mainPushService; + this._serverURI = serverURI; + // Filled in at connect() time + this._broadcastListeners = null; + + // Override the default WebSocket factory function. The returned object + // must be null or satisfy the nsIWebSocketChannel interface. Used by + // the tests to provide a mock WebSocket implementation. + if (options.makeWebSocket) { + this._makeWebSocket = options.makeWebSocket; + } + + this._requestTimeout = prefs.getIntPref("requestTimeout"); + + return Promise.resolve(); + }, + + _reconnect() { + console.debug("reconnect()"); + this._shutdownWS(false); + this._startBackoffTimer(); + }, + + _shutdownWS(shouldCancelPending = true) { + console.debug("shutdownWS()"); + + if (this._currentState == STATE_READY) { + prefs.removeObserver("userAgentID", this); + } + + this._currentState = STATE_SHUT_DOWN; + this._skipReconnect = false; + + if (this._wsListener) { + this._wsListener._pushService = null; + } + try { + this._ws.close(0, null); + } catch (e) {} + this._ws = null; + + this._lastPingTime = 0; + + if (this._pingTimer) { + this._pingTimer.cancel(); + } + + if (shouldCancelPending) { + this._cancelPendingRequests(); + } + + if (this._notifyRequestQueue) { + this._notifyRequestQueue(); + this._notifyRequestQueue = null; + } + }, + + uninit() { + // All pending requests (ideally none) are dropped at this point. We + // shouldn't have any applications performing registration/unregistration + // or receiving notifications. + this._shutdownWS(); + + if (this._backoffTimer) { + this._backoffTimer.cancel(); + } + if (this._requestTimeoutTimer) { + this._requestTimeoutTimer.cancel(); + } + + this._mainPushService = null; + + this._dataEnabled = false; + }, + + /** + * How retries work: If the WS is closed due to a socket error, + * _startBackoffTimer() is called. The retry timer is started and when + * it times out, beginWSSetup() is called again. + * + * If we are in the middle of a timeout (i.e. waiting), but + * a register/unregister is called, we don't want to wait around anymore. + * _sendRequest will automatically call beginWSSetup(), which will cancel the + * timer. In addition since the state will have changed, even if a pending + * timer event comes in (because the timer fired the event before it was + * cancelled), so the connection won't be reset. + */ + _startBackoffTimer() { + console.debug("startBackoffTimer()"); + + // Calculate new timeout, but cap it to pingInterval. + let retryTimeout = + prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount); + retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval")); + + this._retryFailCount++; + + console.debug( + "startBackoffTimer: Retry in", + retryTimeout, + "Try number", + this._retryFailCount + ); + + if (!this._backoffTimer) { + this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance( + Ci.nsITimer + ); + } + this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT); + }, + + /** Indicates whether we're waiting for pongs or requests. */ + _hasPendingRequests() { + return this._lastPingTime > 0 || this._pendingRequests.size > 0; + }, + + /** + * Starts the request timeout timer unless we're already waiting for a pong + * or register request. + */ + _startRequestTimeoutTimer() { + if (this._hasPendingRequests()) { + return; + } + if (!this._requestTimeoutTimer) { + this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance( + Ci.nsITimer + ); + } + this._requestTimeoutTimer.init( + this, + this._requestTimeout, + Ci.nsITimer.TYPE_REPEATING_SLACK + ); + }, + + /** Starts or resets the ping timer. */ + _startPingTimer() { + if (!this._pingTimer) { + this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); + } + this._pingTimer.init( + this, + prefs.getIntPref("pingInterval"), + Ci.nsITimer.TYPE_ONE_SHOT + ); + }, + + _makeWebSocket(uri) { + if (!prefs.getBoolPref("connection.enabled")) { + console.warn( + "makeWebSocket: connection.enabled is not set to true.", + "Aborting." + ); + return null; + } + if (Services.io.offline) { + console.warn("makeWebSocket: Network is offline."); + return null; + } + let contractId = + uri.scheme == "ws" + ? "@mozilla.org/network/protocol;1?name=ws" + : "@mozilla.org/network/protocol;1?name=wss"; + let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel); + + socket.initLoadInfo( + null, // aLoadingNode + Services.scriptSecurityManager.getSystemPrincipal(), + null, // aTriggeringPrincipal + Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL, + Ci.nsIContentPolicy.TYPE_WEBSOCKET + ); + // Allow deprecated HTTP request from SystemPrincipal + socket.loadInfo.allowDeprecatedSystemRequests = true; + + return socket; + }, + + _beginWSSetup() { + console.debug("beginWSSetup()"); + if (this._currentState != STATE_SHUT_DOWN) { + console.error( + "_beginWSSetup: Not in shutdown state! Current state", + this._currentState + ); + return; + } + + // Stop any pending reconnects scheduled for the near future. + if (this._backoffTimer) { + this._backoffTimer.cancel(); + } + + let uri = this._serverURI; + if (!uri) { + return; + } + let socket = this._makeWebSocket(uri); + if (!socket) { + return; + } + this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel); + + console.debug("beginWSSetup: Connecting to", uri.spec); + this._wsListener = new PushWebSocketListener(this); + this._ws.protocol = "push-notification"; + + try { + // Grab a wakelock before we open the socket to ensure we don't go to + // sleep before connection the is opened. + this._ws.asyncOpen(uri, uri.spec, 0, this._wsListener, null); + this._currentState = STATE_WAITING_FOR_WS_START; + } catch (e) { + console.error( + "beginWSSetup: Error opening websocket.", + "asyncOpen failed", + e + ); + this._reconnect(); + } + }, + + connect(broadcastListeners) { + console.debug("connect()", broadcastListeners); + this._broadcastListeners = broadcastListeners; + this._beginWSSetup(); + }, + + isConnected() { + return !!this._ws; + }, + + /** + * Protocol handler invoked by server message. + */ + _handleHelloReply(reply) { + console.debug("handleHelloReply()"); + if (this._currentState != STATE_WAITING_FOR_HELLO) { + console.error( + "handleHelloReply: Unexpected state", + this._currentState, + "(expected STATE_WAITING_FOR_HELLO)" + ); + this._shutdownWS(); + return; + } + + if (typeof reply.uaid !== "string") { + console.error("handleHelloReply: Received invalid UAID", reply.uaid); + this._shutdownWS(); + return; + } + + if (reply.uaid === "") { + console.error("handleHelloReply: Received empty UAID"); + this._shutdownWS(); + return; + } + + // To avoid sticking extra large values sent by an evil server into prefs. + if (reply.uaid.length > 128) { + console.error( + "handleHelloReply: UAID received from server was too long", + reply.uaid + ); + this._shutdownWS(); + return; + } + + let sendRequests = () => { + if (this._notifyRequestQueue) { + this._notifyRequestQueue(); + this._notifyRequestQueue = null; + } + this._sendPendingRequests(); + }; + + function finishHandshake() { + this._UAID = reply.uaid; + this._currentState = STATE_READY; + prefs.addObserver("userAgentID", this); + + // Handle broadcasts received in response to the "hello" message. + if (!ObjectUtils.isEmpty(reply.broadcasts)) { + // The reply isn't technically a broadcast message, but it has + // the shape of a broadcast message (it has a broadcasts field). + const context = { phase: pushBroadcastService.PHASES.HELLO }; + this._mainPushService.receivedBroadcastMessage(reply, context); + } + + this._dataEnabled = !!reply.use_webpush; + if (this._dataEnabled) { + this._mainPushService + .getAllUnexpired() + .then(records => + Promise.all( + records.map(record => + this._mainPushService.ensureCrypto(record).catch(error => { + console.error( + "finishHandshake: Error updating record", + record.keyID, + error + ); + }) + ) + ) + ) + .then(sendRequests); + } else { + sendRequests(); + } + } + + // By this point we've got a UAID from the server that we are ready to + // accept. + // + // We unconditionally drop all existing registrations and notify service + // workers if we receive a new UAID. This ensures we expunge all stale + // registrations if the `userAgentID` pref is reset. + if (this._UAID != reply.uaid) { + console.debug("handleHelloReply: Received new UAID"); + + this._mainPushService + .dropUnexpiredRegistrations() + .then(finishHandshake.bind(this)); + + return; + } + + // otherwise we are good to go + finishHandshake.bind(this)(); + }, + + /** + * Protocol handler invoked by server message. + */ + _handleRegisterReply(reply) { + console.debug("handleRegisterReply()"); + + let tmp = this._takeRequestForReply(reply); + if (!tmp) { + return; + } + + if (reply.status == 200) { + try { + Services.io.newURI(reply.pushEndpoint); + } catch (e) { + tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint)); + return; + } + + let record = new PushRecordWebSocket({ + channelID: reply.channelID, + pushEndpoint: reply.pushEndpoint, + scope: tmp.record.scope, + originAttributes: tmp.record.originAttributes, + version: null, + systemRecord: tmp.record.systemRecord, + appServerKey: tmp.record.appServerKey, + ctime: Date.now(), + }); + tmp.resolve(record); + } else { + console.error("handleRegisterReply: Unexpected server response", reply); + tmp.reject( + new Error("Wrong status code for register reply: " + reply.status) + ); + } + }, + + _handleUnregisterReply(reply) { + console.debug("handleUnregisterReply()"); + + let request = this._takeRequestForReply(reply); + if (!request) { + return; + } + + let success = reply.status === 200; + request.resolve(success); + }, + + _handleDataUpdate(update) { + let promise; + if (typeof update.channelID != "string") { + console.warn( + "handleDataUpdate: Discarding update without channel ID", + update + ); + return; + } + function updateRecord(record) { + // Ignore messages that we've already processed. This can happen if the + // connection drops between notifying the service worker and acking the + // the message. In that case, the server will re-send the message on + // reconnect. + if (record.hasRecentMessageID(update.version)) { + console.warn( + "handleDataUpdate: Ignoring duplicate message", + update.version + ); + return null; + } + record.noteRecentMessageID(update.version); + return record; + } + if (typeof update.data != "string") { + promise = this._mainPushService.receivedPushMessage( + update.channelID, + update.version, + null, + null, + updateRecord + ); + } else { + let message = ChromeUtils.base64URLDecode(update.data, { + // The Push server may append padding. + padding: "ignore", + }); + promise = this._mainPushService.receivedPushMessage( + update.channelID, + update.version, + update.headers, + message, + updateRecord + ); + } + promise + .then( + status => { + this._sendAck(update.channelID, update.version, status); + }, + err => { + console.error( + "handleDataUpdate: Error delivering message", + update, + err + ); + this._sendAck( + update.channelID, + update.version, + Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR + ); + } + ) + .catch(err => { + console.error( + "handleDataUpdate: Error acknowledging message", + update, + err + ); + }); + }, + + /** + * Protocol handler invoked by server message. + */ + _handleNotificationReply(reply) { + console.debug("handleNotificationReply()"); + if (this._dataEnabled) { + this._handleDataUpdate(reply); + return; + } + + if (typeof reply.updates !== "object") { + console.warn("handleNotificationReply: Missing updates", reply.updates); + return; + } + + console.debug("handleNotificationReply: Got updates", reply.updates); + for (let i = 0; i < reply.updates.length; i++) { + let update = reply.updates[i]; + console.debug("handleNotificationReply: Handling update", update); + if (typeof update.channelID !== "string") { + console.debug( + "handleNotificationReply: Invalid update at index", + i, + update + ); + continue; + } + + if (update.version === undefined) { + console.debug("handleNotificationReply: Missing version", update); + continue; + } + + let version = update.version; + + if (typeof version === "string") { + version = parseInt(version, 10); + } + + if (typeof version === "number" && version >= 0) { + // FIXME(nsm): this relies on app update notification being infallible! + // eventually fix this + this._receivedUpdate(update.channelID, version); + } + } + }, + + _handleBroadcastReply(reply) { + let phase = pushBroadcastService.PHASES.BROADCAST; + // Check if this reply is the result of registration. + for (const id of Object.keys(reply.broadcasts)) { + const wasRegistering = this._currentlyRegistering.delete(id); + if (wasRegistering) { + // If we get multiple broadcasts and only one is "registering", + // then we consider the phase to be REGISTER for all of them. + // It is acceptable since registrations do not happen so often, + // and are all very likely to occur soon after browser startup. + phase = pushBroadcastService.PHASES.REGISTER; + } + } + const context = { phase }; + this._mainPushService.receivedBroadcastMessage(reply, context); + }, + + reportDeliveryError(messageID, reason) { + console.debug("reportDeliveryError()"); + let code = kDELIVERY_REASON_TO_CODE[reason]; + if (!code) { + throw new Error("Invalid delivery error reason"); + } + let data = { messageType: "nack", version: messageID, code }; + this._queueRequest(data); + }, + + _sendAck(channelID, version, status) { + console.debug("sendAck()"); + let code = kACK_STATUS_TO_CODE[status]; + if (!code) { + throw new Error("Invalid ack status"); + } + let data = { messageType: "ack", updates: [{ channelID, version, code }] }; + this._queueRequest(data); + }, + + _generateID() { + let uuidGenerator = Cc["@mozilla.org/uuid-generator;1"].getService( + Ci.nsIUUIDGenerator + ); + // generateUUID() gives a UUID surrounded by {...}, slice them off. + return uuidGenerator + .generateUUID() + .toString() + .slice(1, -1); + }, + + register(record) { + console.debug("register() ", record); + + let data = { channelID: this._generateID(), messageType: "register" }; + + if (record.appServerKey) { + data.key = ChromeUtils.base64URLEncode(record.appServerKey, { + // The Push server requires padding. + pad: true, + }); + } + + return this._sendRequestForReply(record, data).then(record => { + if (!this._dataEnabled) { + return record; + } + return PushCrypto.generateKeys().then(([publicKey, privateKey]) => { + record.p256dhPublicKey = publicKey; + record.p256dhPrivateKey = privateKey; + record.authenticationSecret = PushCrypto.generateAuthenticationSecret(); + return record; + }); + }); + }, + + unregister(record, reason) { + console.debug("unregister() ", record, reason); + + return Promise.resolve().then(_ => { + let code = kUNREGISTER_REASON_TO_CODE[reason]; + if (!code) { + throw new Error("Invalid unregister reason"); + } + let data = { + channelID: record.channelID, + messageType: "unregister", + code, + }; + + return this._sendRequestForReply(record, data); + }); + }, + + _queueStart: Promise.resolve(), + _notifyRequestQueue: null, + _queue: null, + _enqueue(op) { + console.debug("enqueue()"); + if (!this._queue) { + this._queue = this._queueStart; + } + this._queue = this._queue.then(op).catch(_ => {}); + }, + + /** Sends a request to the server. */ + _send(data) { + if (this._currentState != STATE_READY) { + console.warn( + "send: Unexpected state; ignoring message", + this._currentState + ); + return; + } + if (!this._requestHasReply(data)) { + this._wsSendMessage(data); + return; + } + // If we're expecting a reply, check that we haven't cancelled the request. + let key = this._makePendingRequestKey(data); + if (!this._pendingRequests.has(key)) { + console.log("send: Request cancelled; ignoring message", key); + return; + } + this._wsSendMessage(data); + }, + + /** Indicates whether a request has a corresponding reply from the server. */ + _requestHasReply(data) { + return data.messageType == "register" || data.messageType == "unregister"; + }, + + /** + * Sends all pending requests that expect replies. Called after the connection + * is established and the handshake is complete. + */ + _sendPendingRequests() { + this._enqueue(_ => { + for (let request of this._pendingRequests.values()) { + this._send(request.data); + } + }); + }, + + /** Queues an outgoing request, establishing a connection if necessary. */ + _queueRequest(data) { + console.debug("queueRequest()", data); + + if (this._currentState == STATE_READY) { + // If we're ready, no need to queue; just send the request. + this._send(data); + return; + } + + // Otherwise, we're still setting up. If we don't have a request queue, + // make one now. + if (!this._notifyRequestQueue) { + let promise = new Promise((resolve, reject) => { + this._notifyRequestQueue = resolve; + }); + this._enqueue(_ => promise); + } + + let isRequest = this._requestHasReply(data); + if (!isRequest) { + // Don't queue requests, since they're stored in `_pendingRequests`, and + // `_sendPendingRequests` will send them after reconnecting. Without this + // check, we'd send requests twice. + this._enqueue(_ => this._send(data)); + } + + if (!this._ws) { + // This will end up calling notifyRequestQueue(). + this._beginWSSetup(); + // If beginWSSetup does not succeed to make ws, notifyRequestQueue will + // not be call. + if (!this._ws && this._notifyRequestQueue) { + this._notifyRequestQueue(); + this._notifyRequestQueue = null; + } + } + }, + + _receivedUpdate(aChannelID, aLatestVersion) { + console.debug("receivedUpdate: Updating", aChannelID, "->", aLatestVersion); + + this._mainPushService + .receivedPushMessage(aChannelID, "", null, null, record => { + if (record.version === null || record.version < aLatestVersion) { + console.debug( + "receivedUpdate: Version changed for", + aChannelID, + aLatestVersion + ); + record.version = aLatestVersion; + return record; + } + console.debug( + "receivedUpdate: No significant version change for", + aChannelID, + aLatestVersion + ); + return null; + }) + .then(status => { + this._sendAck(aChannelID, aLatestVersion, status); + }) + .catch(err => { + console.error( + "receivedUpdate: Error acknowledging message", + aChannelID, + aLatestVersion, + err + ); + }); + }, + + // begin Push protocol handshake + _wsOnStart(context) { + console.debug("wsOnStart()"); + + if (this._currentState != STATE_WAITING_FOR_WS_START) { + console.error( + "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current", + "state", + this._currentState, + "Skipping" + ); + return; + } + + this._mainPushService + .getAllUnexpired() + .then( + records => this._sendHello(records), + err => { + console.warn( + "Error fetching existing records before handshake; assuming none", + err + ); + this._sendHello([]); + } + ) + .catch(err => { + // If we failed to send the handshake, back off and reconnect. + console.warn("Failed to send handshake; reconnecting", err); + this._reconnect(); + }); + }, + + /** + * Sends a `hello` handshake to the server. + * + * @param {Array<PushRecordWebSocket>} An array of records for existing + * subscriptions, used to determine whether to rotate our UAID. + */ + _sendHello(records) { + let data = { + messageType: "hello", + broadcasts: this._broadcastListeners, + use_webpush: true, + }; + + if (records.length && this._UAID) { + // Only send our UAID if we have existing push subscriptions, to + // avoid tying a persistent identifier to the connection (bug + // 1617136). The push server will issue our client a new UAID in + // the `hello` response, which we'll store until either the next + // time we reconnect, or the user subscribes to push. Once we have a + // push subscription, we'll stop rotating the UAID when we connect, + // so that we can receive push messages for them. + data.uaid = this._UAID; + } + + this._wsSendMessage(data); + this._currentState = STATE_WAITING_FOR_HELLO; + }, + + /** + * This statusCode is not the websocket protocol status code, but the TCP + * connection close status code. + * + * If we do not explicitly call ws.close() then statusCode is always + * NS_BASE_STREAM_CLOSED, even on a successful close. + */ + _wsOnStop(context, statusCode) { + console.debug("wsOnStop()"); + + if (statusCode != Cr.NS_OK && !this._skipReconnect) { + console.debug("wsOnStop: Reconnecting after socket error", statusCode); + this._reconnect(); + return; + } + + this._shutdownWS(); + }, + + _wsOnMessageAvailable(context, message) { + console.debug("wsOnMessageAvailable()", message); + + // Clearing the last ping time indicates we're no longer waiting for a pong. + this._lastPingTime = 0; + + let reply; + try { + reply = JSON.parse(message); + } catch (e) { + console.warn("wsOnMessageAvailable: Invalid JSON", message, e); + return; + } + + // If we receive a message, we know the connection succeeded. Reset the + // connection attempt and ping interval counters. + this._retryFailCount = 0; + + let doNotHandle = false; + if ( + message === "{}" || + reply.messageType === undefined || + reply.messageType === "ping" || + typeof reply.messageType != "string" + ) { + console.debug("wsOnMessageAvailable: Pong received"); + doNotHandle = true; + } + + // Reset the ping timer. Note: This path is executed at every step of the + // handshake, so this timer does not need to be set explicitly at startup. + this._startPingTimer(); + + // If it is a ping, do not handle the message. + if (doNotHandle) { + return; + } + + // A whitelist of protocol handlers. Add to these if new messages are added + // in the protocol. + let handlers = [ + "Hello", + "Register", + "Unregister", + "Notification", + "Broadcast", + ]; + + // Build up the handler name to call from messageType. + // e.g. messageType == "register" -> _handleRegisterReply. + let handlerName = + reply.messageType[0].toUpperCase() + + reply.messageType.slice(1).toLowerCase(); + + if (!handlers.includes(handlerName)) { + console.warn( + "wsOnMessageAvailable: No whitelisted handler", + handlerName, + "for message", + reply.messageType + ); + return; + } + + let handler = "_handle" + handlerName + "Reply"; + + if (typeof this[handler] !== "function") { + console.warn( + "wsOnMessageAvailable: Handler", + handler, + "whitelisted but not implemented" + ); + return; + } + + this[handler](reply); + }, + + /** + * The websocket should never be closed. Since we don't call ws.close(), + * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that + * function), which calls reconnect and re-establishes the WebSocket + * connection. + * + * If the server requested that we back off, we won't reconnect until the + * next network state change event, or until we need to send a new register + * request. + */ + _wsOnServerClose(context, aStatusCode, aReason) { + console.debug("wsOnServerClose()", aStatusCode, aReason); + + if (aStatusCode == kBACKOFF_WS_STATUS_CODE) { + console.debug("wsOnServerClose: Skipping automatic reconnect"); + this._skipReconnect = true; + } + }, + + /** + * Rejects all pending register requests with errors. + */ + _cancelPendingRequests() { + for (let request of this._pendingRequests.values()) { + request.reject(new Error("Request aborted")); + } + this._pendingRequests.clear(); + }, + + /** Creates a case-insensitive map key for a request that expects a reply. */ + _makePendingRequestKey(data) { + return (data.messageType + "|" + data.channelID).toLowerCase(); + }, + + /** Sends a request and waits for a reply from the server. */ + _sendRequestForReply(record, data) { + return Promise.resolve().then(_ => { + // start the timer since we now have at least one request + this._startRequestTimeoutTimer(); + + let key = this._makePendingRequestKey(data); + if (!this._pendingRequests.has(key)) { + let request = { + data, + record, + ctime: Date.now(), + }; + request.promise = new Promise((resolve, reject) => { + request.resolve = resolve; + request.reject = reject; + }); + this._pendingRequests.set(key, request); + this._queueRequest(data); + } + + return this._pendingRequests.get(key).promise; + }); + }, + + /** Removes and returns a pending request for a server reply. */ + _takeRequestForReply(reply) { + if (typeof reply.channelID !== "string") { + return null; + } + let key = this._makePendingRequestKey(reply); + let request = this._pendingRequests.get(key); + if (!request) { + return null; + } + this._pendingRequests.delete(key); + if (!this._hasPendingRequests()) { + this._requestTimeoutTimer.cancel(); + } + return request; + }, + + sendSubscribeBroadcast(serviceId, version) { + this._currentlyRegistering.add(serviceId); + let data = { + messageType: "broadcast_subscribe", + broadcasts: { + [serviceId]: version, + }, + }; + + this._queueRequest(data); + }, +}; + +function PushRecordWebSocket(record) { + PushRecord.call(this, record); + this.channelID = record.channelID; + this.version = record.version; +} + +PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, { + keyID: { + get() { + return this.channelID; + }, + }, +}); + +PushRecordWebSocket.prototype.toSubscription = function() { + let subscription = PushRecord.prototype.toSubscription.call(this); + subscription.version = this.version; + return subscription; +}; |