1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import json
import os
import re
import shutil
import tarfile
import tempfile
import unittest
import responses
from mozprofile.prefs import Preferences
from condprof.client import ROOT_URL, TC_SERVICE, get_profile
from condprof.util import _DEFAULT_SERVER
PROFILE = re.compile(ROOT_URL + "/.*/.*tgz")
PROFILE_FOR_TESTS = os.path.join(os.path.dirname(__file__), "profile")
SECRETS = re.compile(_DEFAULT_SERVER + "/.*")
SECRETS_PROXY = re.compile("http://taskcluster/secrets/.*")
class TestClient(unittest.TestCase):
def setUp(self):
self.profile_dir = tempfile.mkdtemp()
# creating profile.tgz on the fly for serving it
profile_tgz = os.path.join(self.profile_dir, "profile.tgz")
with tarfile.open(profile_tgz, "w:gz") as tar:
tar.add(PROFILE_FOR_TESTS, arcname=".")
# self.profile_data is the tarball we're sending back via HTTP
with open(profile_tgz, "rb") as f:
self.profile_data = f.read()
self.target = tempfile.mkdtemp()
self.download_dir = os.path.expanduser("~/.condprof-cache")
if os.path.exists(self.download_dir):
shutil.rmtree(self.download_dir)
responses.add(
responses.GET,
PROFILE,
body=self.profile_data,
headers={"content-length": str(len(self.profile_data)), "ETag": "'12345'"},
status=200,
)
responses.add(
responses.HEAD,
PROFILE,
body="",
headers={"content-length": str(len(self.profile_data)), "ETag": "'12345'"},
status=200,
)
responses.add(responses.HEAD, TC_SERVICE, body="", status=200)
secret = {"secret": {"username": "user", "password": "pass"}}
secret = json.dumps(secret)
for pattern in (SECRETS, SECRETS_PROXY):
responses.add(
responses.GET,
pattern,
body=secret,
headers={"content-length": str(len(secret))},
status=200,
)
def tearDown(self):
shutil.rmtree(self.target)
shutil.rmtree(self.download_dir)
shutil.rmtree(self.profile_dir)
@responses.activate
def test_cache(self):
download_dir = os.path.expanduser("~/.condprof-cache")
if os.path.exists(download_dir):
num_elmts = len(os.listdir(download_dir))
else:
num_elmts = 0
get_profile(self.target, "win64", "settled", "default")
# grabbing a profile should generate two files
self.assertEqual(len(os.listdir(download_dir)), num_elmts + 2)
# we do at least two network calls when getting a file,
# a HEAD and a GET and possibly a TC secret
self.assertTrue(len(responses.calls) >= 2)
# reseting the response counters
responses.calls.reset()
# and we should reuse them without downloading the file again
get_profile(self.target, "win64", "settled", "default")
# grabbing a profile should not download new stuff
self.assertEqual(len(os.listdir(download_dir)), num_elmts + 2)
# and do a single extra HEAD call, everything else is cached,
# even the TC secret
self.assertEqual(len(responses.calls), 2)
prefs_js = os.path.join(self.target, "prefs.js")
prefs = Preferences.read_prefs(prefs_js)
# check that the gfx.blacklist prefs where cleaned out
for name, value in prefs:
self.assertFalse(name.startswith("gfx.blacklist"))
# check that we have the startupScanScopes option forced
prefs = dict(prefs)
self.assertEqual(prefs["extensions.startupScanScopes"], 1)
# make sure we don't have any marionette option set
user_js = os.path.join(self.target, "user.js")
for name, value in Preferences.read_prefs(user_js):
self.assertFalse(name.startswith("marionette."))
if __name__ == "__main__":
try:
import mozunit
except ImportError:
pass
else:
mozunit.main(runwith="unittest")
|