194 lines
8.1 KiB
Python
194 lines
8.1 KiB
Python
import json
|
|
|
|
test_to_session_manager_mapping = {}
|
|
|
|
def initialize_test():
|
|
test_id = str(len(test_to_session_manager_mapping))
|
|
test_to_session_manager_mapping[test_id] = SessionManager()
|
|
return test_id
|
|
|
|
def find_for_request(request):
|
|
test_id = request.cookies.get(b'test_id').value.decode('utf-8')
|
|
manager = test_to_session_manager_mapping.get(test_id)
|
|
if manager == None:
|
|
raise Exception(f"Could not find manager for test_id: {test_id}")
|
|
return manager
|
|
|
|
class CookieDetail:
|
|
def __init__(self, name_and_value = None, attributes = None):
|
|
self.name_and_value = name_and_value
|
|
self.attributes = attributes
|
|
|
|
def get_name_and_value(self):
|
|
if self.name_and_value is None:
|
|
return "auth_cookie=abcdef0123"
|
|
return self.name_and_value
|
|
|
|
def get_attributes(self, request):
|
|
if self.attributes is None:
|
|
return f"Domain={request.url_parts.hostname}; Path=/device-bound-session-credentials"
|
|
return self.attributes
|
|
|
|
class SessionManager:
|
|
def __init__(self):
|
|
self.session_to_key_map = {}
|
|
self.should_refresh_end_session = False
|
|
self.authorization_value = None
|
|
self.scope_origin = None
|
|
self.registration_sends_challenge = False
|
|
self.cookie_details = None
|
|
self.session_to_cookie_details_map = {}
|
|
self.session_to_early_challenge_map = {}
|
|
self.has_called_refresh = False
|
|
self.scope_specification_items = []
|
|
self.refresh_sends_challenge = True
|
|
self.refresh_url = "/device-bound-session-credentials/refresh_session.py"
|
|
self.include_site = True
|
|
|
|
def next_session_id(self):
|
|
return len(self.session_to_key_map)
|
|
|
|
def create_new_session(self):
|
|
session_id = self.next_session_id()
|
|
self.session_to_key_map[session_id] = None
|
|
return session_id
|
|
|
|
def set_session_key(self, session_id, key):
|
|
if session_id not in self.session_to_key_map:
|
|
return False
|
|
self.session_to_key_map[session_id] = key
|
|
return True
|
|
|
|
def get_session_key(self, session_id):
|
|
return self.session_to_key_map.get(session_id)
|
|
|
|
def get_session_ids(self):
|
|
return list(self.session_to_key_map.keys())
|
|
|
|
def configure_state_for_test(self, configuration):
|
|
should_refresh_end_session = configuration.get("shouldRefreshEndSession")
|
|
if should_refresh_end_session is not None:
|
|
self.should_refresh_end_session = should_refresh_end_session
|
|
|
|
authorization_value = configuration.get("authorizationValue")
|
|
if authorization_value is not None:
|
|
self.authorization_value = authorization_value
|
|
|
|
scope_origin = configuration.get("scopeOrigin")
|
|
if scope_origin is not None:
|
|
self.scope_origin = scope_origin
|
|
|
|
registration_sends_challenge = configuration.get("registrationSendsChallenge")
|
|
if registration_sends_challenge is not None:
|
|
self.registration_sends_challenge = registration_sends_challenge
|
|
|
|
cookie_details = configuration.get("cookieDetails")
|
|
if cookie_details is not None:
|
|
self.cookie_details = []
|
|
for detail in cookie_details:
|
|
self.cookie_details.append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes")))
|
|
|
|
next_sessions_cookie_details = configuration.get("cookieDetailsForNextRegisteredSessions")
|
|
if next_sessions_cookie_details is not None:
|
|
next_session_id = self.next_session_id()
|
|
for session in next_sessions_cookie_details:
|
|
self.session_to_cookie_details_map[next_session_id] = []
|
|
for detail in session:
|
|
self.session_to_cookie_details_map[next_session_id].append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes")))
|
|
next_session_id += 1
|
|
|
|
next_session_early_challenge = configuration.get("earlyChallengeForNextRegisteredSession")
|
|
if next_session_early_challenge is not None:
|
|
self.session_to_early_challenge_map[self.next_session_id()] = next_session_early_challenge
|
|
|
|
scope_specification_items = configuration.get("scopeSpecificationItems")
|
|
if scope_specification_items is not None:
|
|
self.scope_specification_items = scope_specification_items
|
|
|
|
refresh_sends_challenge = configuration.get("refreshSendsChallenge")
|
|
if refresh_sends_challenge is not None:
|
|
self.refresh_sends_challenge = refresh_sends_challenge
|
|
|
|
refresh_url = configuration.get("refreshUrl")
|
|
if refresh_url is not None:
|
|
self.refresh_url = refresh_url
|
|
|
|
include_site = configuration.get("includeSite")
|
|
if include_site is not None:
|
|
self.include_site = include_site
|
|
|
|
def get_should_refresh_end_session(self):
|
|
return self.should_refresh_end_session
|
|
|
|
def get_authorization_value(self):
|
|
return self.authorization_value
|
|
|
|
def get_registration_sends_challenge(self):
|
|
return self.registration_sends_challenge
|
|
|
|
def reset_registration_sends_challenge(self):
|
|
self.registration_sends_challenge = False
|
|
|
|
def get_refresh_sends_challenge(self):
|
|
return self.refresh_sends_challenge
|
|
|
|
def set_has_called_refresh(self, has_called_refresh):
|
|
self.has_called_refresh = has_called_refresh
|
|
|
|
def pull_server_state(self):
|
|
return {
|
|
"hasCalledRefresh": self.has_called_refresh
|
|
}
|
|
|
|
def get_cookie_details(self, session_id):
|
|
# Try to use the session-specific override first.
|
|
if self.session_to_cookie_details_map.get(session_id) is not None:
|
|
return self.session_to_cookie_details_map[session_id]
|
|
# If there isn't any, use the general override.
|
|
if self.cookie_details is not None:
|
|
return self.cookie_details
|
|
return [CookieDetail()]
|
|
|
|
def get_early_challenge(self, session_id):
|
|
return self.session_to_early_challenge_map.get(session_id)
|
|
|
|
def get_sessions_instructions_response_credentials(self, session_id, request):
|
|
return list(map(lambda cookie_detail: {
|
|
"type": "cookie",
|
|
"name": cookie_detail.get_name_and_value().split("=")[0],
|
|
"attributes": cookie_detail.get_attributes(request)
|
|
}, self.get_cookie_details(session_id)))
|
|
|
|
def get_session_instructions_response_set_cookie_headers(self, session_id, request):
|
|
header_values = list(map(
|
|
lambda cookie_detail: f"{cookie_detail.get_name_and_value()}; {cookie_detail.get_attributes(request)}",
|
|
self.get_cookie_details(session_id)
|
|
))
|
|
return [("Set-Cookie", header_value) for header_value in header_values]
|
|
|
|
def get_session_instructions_response(self, session_id, request):
|
|
scope_origin = ""
|
|
if self.scope_origin is not None:
|
|
scope_origin = self.scope_origin
|
|
|
|
response_body = {
|
|
"session_identifier": str(session_id),
|
|
"refresh_url": self.refresh_url,
|
|
"scope": {
|
|
"origin": scope_origin,
|
|
"include_site": self.include_site,
|
|
"scope_specification" : self.scope_specification_items + [
|
|
{ "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/request_early_challenge.py" },
|
|
{ "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/end_session_via_clear_site_data.py" },
|
|
{ "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/pull_server_state.py" },
|
|
{ "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/set_cookie.py" },
|
|
]
|
|
},
|
|
"credentials": self.get_sessions_instructions_response_credentials(session_id, request)
|
|
}
|
|
headers = self.get_session_instructions_response_set_cookie_headers(session_id, request) + [
|
|
("Content-Type", "application/json"),
|
|
("Cache-Control", "no-store")
|
|
]
|
|
|
|
return (200, headers, json.dumps(response_body))
|