import json test_to_session_manager_mapping = {} def initialize_test(): test_id = str(len(test_to_session_manager_mapping)) test_to_session_manager_mapping[test_id] = SessionManager() return test_id def find_for_request(request): test_id = request.cookies.get(b'test_id').value.decode('utf-8') manager = test_to_session_manager_mapping.get(test_id) if manager == None: raise Exception(f"Could not find manager for test_id: {test_id}") return manager class CookieDetail: def __init__(self, name_and_value = None, attributes = None): self.name_and_value = name_and_value self.attributes = attributes def get_name_and_value(self): if self.name_and_value is None: return "auth_cookie=abcdef0123" return self.name_and_value def get_attributes(self, request): if self.attributes is None: return f"Domain={request.url_parts.hostname}; Path=/device-bound-session-credentials" return self.attributes class SessionManager: def __init__(self): self.session_to_key_map = {} self.should_refresh_end_session = False self.authorization_value = None self.scope_origin = None self.registration_sends_challenge = False self.cookie_details = None self.session_to_cookie_details_map = {} self.session_to_early_challenge_map = {} self.has_called_refresh = False self.scope_specification_items = [] self.refresh_sends_challenge = True self.refresh_url = "/device-bound-session-credentials/refresh_session.py" self.include_site = True def next_session_id(self): return len(self.session_to_key_map) def create_new_session(self): session_id = self.next_session_id() self.session_to_key_map[session_id] = None return session_id def set_session_key(self, session_id, key): if session_id not in self.session_to_key_map: return False self.session_to_key_map[session_id] = key return True def get_session_key(self, session_id): return self.session_to_key_map.get(session_id) def get_session_ids(self): return list(self.session_to_key_map.keys()) def configure_state_for_test(self, configuration): should_refresh_end_session = configuration.get("shouldRefreshEndSession") if should_refresh_end_session is not None: self.should_refresh_end_session = should_refresh_end_session authorization_value = configuration.get("authorizationValue") if authorization_value is not None: self.authorization_value = authorization_value scope_origin = configuration.get("scopeOrigin") if scope_origin is not None: self.scope_origin = scope_origin registration_sends_challenge = configuration.get("registrationSendsChallenge") if registration_sends_challenge is not None: self.registration_sends_challenge = registration_sends_challenge cookie_details = configuration.get("cookieDetails") if cookie_details is not None: self.cookie_details = [] for detail in cookie_details: self.cookie_details.append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes"))) next_sessions_cookie_details = configuration.get("cookieDetailsForNextRegisteredSessions") if next_sessions_cookie_details is not None: next_session_id = self.next_session_id() for session in next_sessions_cookie_details: self.session_to_cookie_details_map[next_session_id] = [] for detail in session: self.session_to_cookie_details_map[next_session_id].append(CookieDetail(detail.get("nameAndValue"), detail.get("attributes"))) next_session_id += 1 next_session_early_challenge = configuration.get("earlyChallengeForNextRegisteredSession") if next_session_early_challenge is not None: self.session_to_early_challenge_map[self.next_session_id()] = next_session_early_challenge scope_specification_items = configuration.get("scopeSpecificationItems") if scope_specification_items is not None: self.scope_specification_items = scope_specification_items refresh_sends_challenge = configuration.get("refreshSendsChallenge") if refresh_sends_challenge is not None: self.refresh_sends_challenge = refresh_sends_challenge refresh_url = configuration.get("refreshUrl") if refresh_url is not None: self.refresh_url = refresh_url include_site = configuration.get("includeSite") if include_site is not None: self.include_site = include_site def get_should_refresh_end_session(self): return self.should_refresh_end_session def get_authorization_value(self): return self.authorization_value def get_registration_sends_challenge(self): return self.registration_sends_challenge def reset_registration_sends_challenge(self): self.registration_sends_challenge = False def get_refresh_sends_challenge(self): return self.refresh_sends_challenge def set_has_called_refresh(self, has_called_refresh): self.has_called_refresh = has_called_refresh def pull_server_state(self): return { "hasCalledRefresh": self.has_called_refresh } def get_cookie_details(self, session_id): # Try to use the session-specific override first. if self.session_to_cookie_details_map.get(session_id) is not None: return self.session_to_cookie_details_map[session_id] # If there isn't any, use the general override. if self.cookie_details is not None: return self.cookie_details return [CookieDetail()] def get_early_challenge(self, session_id): return self.session_to_early_challenge_map.get(session_id) def get_sessions_instructions_response_credentials(self, session_id, request): return list(map(lambda cookie_detail: { "type": "cookie", "name": cookie_detail.get_name_and_value().split("=")[0], "attributes": cookie_detail.get_attributes(request) }, self.get_cookie_details(session_id))) def get_session_instructions_response_set_cookie_headers(self, session_id, request): header_values = list(map( lambda cookie_detail: f"{cookie_detail.get_name_and_value()}; {cookie_detail.get_attributes(request)}", self.get_cookie_details(session_id) )) return [("Set-Cookie", header_value) for header_value in header_values] def get_session_instructions_response(self, session_id, request): scope_origin = "" if self.scope_origin is not None: scope_origin = self.scope_origin response_body = { "session_identifier": str(session_id), "refresh_url": self.refresh_url, "scope": { "origin": scope_origin, "include_site": self.include_site, "scope_specification" : self.scope_specification_items + [ { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/request_early_challenge.py" }, { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/end_session_via_clear_site_data.py" }, { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/pull_server_state.py" }, { "type": "exclude", "domain": request.url_parts.hostname, "path": "/device-bound-session-credentials/set_cookie.py" }, ] }, "credentials": self.get_sessions_instructions_response_credentials(session_id, request) } headers = self.get_session_instructions_response_set_cookie_headers(session_id, request) + [ ("Content-Type", "application/json"), ("Cache-Control", "no-store") ] return (200, headers, json.dumps(response_body))