# mypy: allow-untyped-defs from ..data.session import Session, UNKNOWN from datetime import datetime import dateutil.parser from dateutil.tz import tzutc def deserialize_sessions(session_dicts): sessions = [] for session_dict in session_dicts: session = deserialize_session(session_dict) sessions.append(session) return sessions def deserialize_session(session_dict): token = "" if "token" in session_dict: token = session_dict["token"] tests = {"include": [], "exclude": []} if "tests" in session_dict: tests = session_dict["tests"] if "path" in session_dict: test_paths = session_dict["path"].split(", ") tests["include"] = tests["include"] + test_paths test_types = [] if "types" in session_dict: test_types = session_dict["types"] user_agent = "" if "user_agent" in session_dict: user_agent = session_dict["user_agent"] labels = [] if "labels" in session_dict: labels = session_dict["labels"] timeouts = {} if "timeouts" in session_dict: timeouts = session_dict["timeouts"] pending_tests = None if "pending_tests" in session_dict: pending_tests = session_dict["pending_tests"] running_tests = None if "running_tests" in session_dict: running_tests = session_dict["running_tests"] status = UNKNOWN if "status" in session_dict: status = session_dict["status"] test_state = None if "test_state" in session_dict: test_state = session_dict["test_state"] last_completed_test = None if "last_completed_test" in session_dict: last_completed_test = session_dict["last_completed_test"] date_created = None if "date_created" in session_dict: date_created = session_dict["date_created"] date_created = iso_to_millis(date_created) date_started = None if "date_started" in session_dict: date_started = session_dict["date_started"] date_started = iso_to_millis(date_started) date_finished = None if "date_finished" in session_dict: date_finished = session_dict["date_finished"] date_finished = iso_to_millis(date_finished) is_public = False if "is_public" in session_dict: is_public = session_dict["is_public"] reference_tokens = [] if "reference_tokens" in session_dict: reference_tokens = session_dict["reference_tokens"] browser = None if "browser" in session_dict: browser = session_dict["browser"] expiration_date = None if "expiration_date" in session_dict: expiration_date = session_dict["expiration_date"] expiration_date = iso_to_millis(expiration_date) type = None if "type" in session_dict: type = session_dict["type"] malfunctioning_tests = [] if "malfunctioning_tests" in session_dict: malfunctioning_tests = session_dict["malfunctioning_tests"] return Session( token=token, tests=tests, test_types=test_types, user_agent=user_agent, labels=labels, timeouts=timeouts, pending_tests=pending_tests, running_tests=running_tests, status=status, test_state=test_state, last_completed_test=last_completed_test, date_created=date_created, date_started=date_started, date_finished=date_finished, is_public=is_public, reference_tokens=reference_tokens, browser=browser, expiration_date=expiration_date, type=type, malfunctioning_tests=malfunctioning_tests ) def iso_to_millis(iso_string): if iso_string is None: return None try: date = dateutil.parser.isoparse(iso_string) date = date.replace(tzinfo=tzutc()) epoch = datetime.utcfromtimestamp(0).replace(tzinfo=tzutc()) return int((date - epoch).total_seconds() * 1000) except Exception: return iso_string