186 lines
6.7 KiB
Python
186 lines
6.7 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
sys.path.append(os.fspath(Path(__file__).parents[3] / "quota/test/marionette"))
|
|
|
|
from quota_test_case import QuotaTestCase
|
|
|
|
CACHEAPI_PBM_PREF = "dom.cache.privateBrowsing.enabled"
|
|
QM_TESTING_PREF = "dom.quotaManager.testing"
|
|
|
|
|
|
class CacheAPIEncryptionPBM(QuotaTestCase):
|
|
"""
|
|
Bug1856953: Ensure CacheAPI data gets encrypted in Private Browsing Mode.
|
|
We need to ensure data inside both sqlite fields and request/response files
|
|
gets encrypted
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(CacheAPIEncryptionPBM, self).setUp()
|
|
|
|
self.testHTML = "dom/cache/basicCacheAPI_PBM.html"
|
|
self.cacheName = "CachePBMTest"
|
|
self.profilePath = self.marionette.instance.profile.profile
|
|
self.cacheAPIStoragePath = None
|
|
|
|
self.defaultCacheAPIPBMPref = self.marionette.get_pref(CACHEAPI_PBM_PREF)
|
|
self.marionette.set_pref(CACHEAPI_PBM_PREF, True)
|
|
|
|
self.defaultQMPrefValue = self.marionette.get_pref(QM_TESTING_PREF)
|
|
self.marionette.set_pref(QM_TESTING_PREF, True)
|
|
|
|
self.cacheRequestStr = "https://example.com/"
|
|
self.cacheResponseStr = "CacheAPIEncryptionPBM"
|
|
|
|
self.cacheDBFileName = "caches.sqlite"
|
|
self.cacheDBJournalFileName = "caches.sqlite-wal"
|
|
|
|
self.dbCheckpointThresholdBytes = 512 * 1024
|
|
|
|
def tearDown(self):
|
|
super(CacheAPIEncryptionPBM, self).setUp()
|
|
|
|
self.marionette.set_pref(CACHEAPI_PBM_PREF, self.defaultCacheAPIPBMPref)
|
|
self.marionette.set_pref(QM_TESTING_PREF, self.defaultQMPrefValue)
|
|
|
|
def test_request_response_ondisk(self):
|
|
with self.using_new_window(self.testHTML, private=False) as (
|
|
self.origin,
|
|
self.persistenceType,
|
|
):
|
|
self.runAndValidate(
|
|
lambda exists: self.assertTrue(
|
|
exists, "Failed to find expected data on disk"
|
|
)
|
|
)
|
|
|
|
def test_encrypted_request_response_ondisk(self):
|
|
with self.using_new_window(self.testHTML, private=True) as (
|
|
self.origin,
|
|
self.persistenceType,
|
|
):
|
|
self.runAndValidate(
|
|
lambda exists: self.assertFalse(exists, "Data on disk is not encrypted")
|
|
)
|
|
|
|
def runAndValidate(self, validator):
|
|
self.marionette.execute_async_script(
|
|
"""
|
|
const [name, requestStr, responseStr, resolve] = arguments;
|
|
|
|
const request = new Request(requestStr);
|
|
const response = new Response(responseStr);
|
|
window.wrappedJSObject.addDataIntoCache(name, request, response)
|
|
.then(resolve);
|
|
""",
|
|
script_args=(
|
|
self.cacheName,
|
|
self.cacheRequestStr,
|
|
self.cacheResponseStr,
|
|
),
|
|
)
|
|
|
|
self.ensureInvariantHolds(
|
|
lambda _: os.path.exists(self.getCacheAPIStoragePath())
|
|
)
|
|
|
|
self.validateSqlite(validator)
|
|
self.validateBodyFile(validator)
|
|
|
|
def validateBodyFile(self, validator):
|
|
# Ensure response bodies have been flushed to the disk
|
|
self.ensureInvariantHolds(
|
|
lambda _: self.findDirObj(self.getCacheAPIStoragePath(), "morgue", False)
|
|
is not None
|
|
)
|
|
|
|
cacheResponseDir = self.findDirObj(
|
|
self.getCacheAPIStoragePath(), "morgue", False
|
|
)
|
|
|
|
self.ensureInvariantHolds(lambda _: any(os.listdir(cacheResponseDir)))
|
|
|
|
# Get response bodies directory corresponding to the cache 'self.CacheName' since, there's
|
|
# only one cache object in this origin, it must be the first one.
|
|
cacheResponseBodiesPath = [
|
|
d for d in Path(cacheResponseDir).iterdir() if d.is_dir()
|
|
][0]
|
|
|
|
# Ensure bodies have been transferred to '.final' from '.tmp'
|
|
self.ensureInvariantHolds(
|
|
lambda _: self.findDirObj(cacheResponseBodiesPath, ".final", True)
|
|
is not None
|
|
)
|
|
cacheResponseBodyPath = self.findDirObj(cacheResponseBodiesPath, ".final", True)
|
|
|
|
# Since a cache response would get compressed using snappy; and an unencrypted response would
|
|
# contain 'sNaPpY' as a compression header in the response body file. Check to ensure that
|
|
# 'sNaPpy' does not exist if bodies are getting encrypted.
|
|
foundRawValue = False
|
|
with open(cacheResponseBodyPath, "rb") as f_binary:
|
|
foundRawValue = re.search(b"sNaPpY", f_binary.read()) is not None
|
|
|
|
validator(foundRawValue)
|
|
|
|
def validateSqlite(self, validator):
|
|
self.ensureInvariantHolds(
|
|
lambda _: self.findDirObj(
|
|
self.getCacheAPIStoragePath(), self.cacheDBJournalFileName, True
|
|
)
|
|
is not None
|
|
)
|
|
dbJournalFile = self.findDirObj(
|
|
self.getCacheAPIStoragePath(), self.cacheDBJournalFileName, True
|
|
)
|
|
|
|
self.ensureInvariantHolds(
|
|
lambda _: self.findDirObj(
|
|
self.getCacheAPIStoragePath(), self.cacheDBFileName, True
|
|
)
|
|
is not None
|
|
)
|
|
dbFile = self.findDirObj(
|
|
self.getCacheAPIStoragePath(), self.cacheDBFileName, True
|
|
)
|
|
|
|
# Confirm journal file size is less than 512KB which ensures that checkpoint
|
|
# has not happend yet (dom/cache/DBSchema.cpp::InitializeConnection, kWalAutoCheckpointPages)
|
|
self.assertTrue(
|
|
os.path.getsize(dbJournalFile) < self.dbCheckpointThresholdBytes
|
|
)
|
|
|
|
# Before checkpointing, journal file size should be greater than main sqlite db file.
|
|
self.assertTrue(os.path.getsize(dbJournalFile) > os.path.getsize(dbFile))
|
|
|
|
validator(
|
|
self.cacheRequestStr.encode("ascii") in open(dbJournalFile, "rb").read()
|
|
)
|
|
|
|
self.assertTrue(
|
|
self.resetStoragesForClient(self.persistenceType, self.origin, "cache")
|
|
)
|
|
|
|
self.assertFalse(os.path.getsize(dbJournalFile) > os.path.getsize(dbFile))
|
|
|
|
validator(self.cacheRequestStr.encode("ascii") in open(dbFile, "rb").read())
|
|
|
|
def getCacheAPIStoragePath(self):
|
|
if self.cacheAPIStoragePath is not None:
|
|
return self.cacheAPIStoragePath
|
|
|
|
assert self.origin is not None
|
|
assert self.persistenceType is not None
|
|
|
|
self.cacheAPIStoragePath = self.getStoragePath(
|
|
self.profilePath, self.origin, self.persistenceType, "cache"
|
|
)
|
|
|
|
print("cacheAPI origin directory = " + self.cacheAPIStoragePath)
|
|
return self.cacheAPIStoragePath
|