diff options
Diffstat (limited to '')
-rw-r--r-- | yt_dlp/extractor/goplay.py | 143 |
1 files changed, 70 insertions, 73 deletions
diff --git a/yt_dlp/extractor/goplay.py b/yt_dlp/extractor/goplay.py index 7a98e0f..dfe5afe 100644 --- a/yt_dlp/extractor/goplay.py +++ b/yt_dlp/extractor/goplay.py @@ -31,7 +31,7 @@ class GoPlayIE(InfoExtractor): 'episode': 'Episode 2', 'episode_number': 2, }, - 'skip': 'This video is only available for registered users' + 'skip': 'This video is only available for registered users', }, { 'url': 'https://www.goplay.be/video/a-family-for-thr-holidays-s1-aflevering-1#autoplay', 'info_dict': { @@ -39,7 +39,7 @@ class GoPlayIE(InfoExtractor): 'ext': 'mp4', 'title': 'A Family for the Holidays', }, - 'skip': 'This video is only available for registered users' + 'skip': 'This video is only available for registered users', }, { 'url': 'https://www.goplay.be/video/de-mol/de-mol-s11/de-mol-s11-aflevering-1#autoplay', 'info_dict': { @@ -50,12 +50,12 @@ class GoPlayIE(InfoExtractor): 'series': 'De Mol', 'season_number': 11, 'episode_number': 1, - 'season': 'Season 11' + 'season': 'Season 11', }, 'params': { - 'skip_download': True + 'skip_download': True, }, - 'skip': 'This video is only available for registered users' + 'skip': 'This video is only available for registered users', }] _id_token = None @@ -79,7 +79,7 @@ class GoPlayIE(InfoExtractor): if movie: video_id = movie['videoUuid'] info_dict = { - 'title': movie.get('title') + 'title': movie.get('title'), } else: episode = traverse_obj(video_data, ('playlists', ..., 'episodes', lambda _, v: v['pageInfo']['url'] == url), get_all=False) @@ -94,7 +94,7 @@ class GoPlayIE(InfoExtractor): api = self._download_json( f'https://api.goplay.be/web/v1/videos/long-form/{video_id}', video_id, headers={ - 'Authorization': 'Bearer %s' % self._id_token, + 'Authorization': f'Bearer {self._id_token}', **self.geo_verification_headers(), }) @@ -154,31 +154,32 @@ class AwsIdp: self.ie = ie self.pool_id = pool_id - if "_" not in self.pool_id: - raise ValueError("Invalid pool_id format. Should be <region>_<poolid>.") + if '_' not in self.pool_id: + raise ValueError('Invalid pool_id format. Should be <region>_<poolid>.') self.client_id = client_id - self.region = self.pool_id.split("_")[0] - self.url = "https://cognito-idp.%s.amazonaws.com/" % (self.region,) + self.region = self.pool_id.split('_')[0] + self.url = f'https://cognito-idp.{self.region}.amazonaws.com/' # Initialize the values # https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22 - self.n_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1' + \ - '29024E088A67CC74020BBEA63B139B22514A08798E3404DD' + \ - 'EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245' + \ - 'E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED' + \ - 'EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D' + \ - 'C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F' + \ - '83655D23DCA3AD961C62F356208552BB9ED529077096966D' + \ - '670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B' + \ - 'E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9' + \ - 'DE2BCBF6955817183995497CEA956AE515D2261898FA0510' + \ - '15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64' + \ - 'ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7' + \ - 'ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B' + \ - 'F12FFA06D98A0864D87602733EC86A64521F2B18177B200C' + \ - 'BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31' + \ - '43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF' + self.n_hex = ( + 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1' + '29024E088A67CC74020BBEA63B139B22514A08798E3404DD' + 'EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245' + 'E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED' + 'EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D' + 'C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F' + '83655D23DCA3AD961C62F356208552BB9ED529077096966D' + '670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B' + 'E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9' + 'DE2BCBF6955817183995497CEA956AE515D2261898FA0510' + '15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64' + 'ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7' + 'ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B' + 'F12FFA06D98A0864D87602733EC86A64521F2B18177B200C' + 'BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31' + '43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF') # https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49 self.g_hex = '2' @@ -194,26 +195,26 @@ class AwsIdp: """ Authenticate with a username and password. """ # Step 1: First initiate an authentication request auth_data_dict = self.__get_authentication_request(username) - auth_data = json.dumps(auth_data_dict).encode("utf-8") + auth_data = json.dumps(auth_data_dict).encode() auth_headers = { - "X-Amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth", - "Accept-Encoding": "identity", - "Content-Type": "application/x-amz-json-1.1" + 'X-Amz-Target': 'AWSCognitoIdentityProviderService.InitiateAuth', + 'Accept-Encoding': 'identity', + 'Content-Type': 'application/x-amz-json-1.1', } auth_response_json = self.ie._download_json( self.url, None, data=auth_data, headers=auth_headers, note='Authenticating username', errnote='Invalid username') - challenge_parameters = auth_response_json.get("ChallengeParameters") + challenge_parameters = auth_response_json.get('ChallengeParameters') - if auth_response_json.get("ChallengeName") != "PASSWORD_VERIFIER": - raise AuthenticationException(auth_response_json["message"]) + if auth_response_json.get('ChallengeName') != 'PASSWORD_VERIFIER': + raise AuthenticationException(auth_response_json['message']) # Step 2: Respond to the Challenge with a valid ChallengeResponse challenge_request = self.__get_challenge_response_request(challenge_parameters, password) - challenge_data = json.dumps(challenge_request).encode("utf-8") + challenge_data = json.dumps(challenge_request).encode() challenge_headers = { - "X-Amz-Target": "AWSCognitoIdentityProviderService.RespondToAuthChallenge", - "Content-Type": "application/x-amz-json-1.1" + 'X-Amz-Target': 'AWSCognitoIdentityProviderService.RespondToAuthChallenge', + 'Content-Type': 'application/x-amz-json-1.1', } auth_response_json = self.ie._download_json( self.url, None, data=challenge_data, headers=challenge_headers, @@ -223,7 +224,7 @@ class AwsIdp: raise InvalidLoginException(auth_response_json['message']) return ( auth_response_json['AuthenticationResult']['IdToken'], - auth_response_json['AuthenticationResult']['RefreshToken'] + auth_response_json['AuthenticationResult']['RefreshToken'], ) def __get_authentication_request(self, username): @@ -234,15 +235,14 @@ class AwsIdp: :return: A full Authorization request. :rtype: dict """ - auth_request = { - "AuthParameters": { - "USERNAME": username, - "SRP_A": self.__long_to_hex(self.large_a_value) + return { + 'AuthParameters': { + 'USERNAME': username, + 'SRP_A': self.__long_to_hex(self.large_a_value), }, - "AuthFlow": "USER_SRP_AUTH", - "ClientId": self.client_id + 'AuthFlow': 'USER_SRP_AUTH', + 'ClientId': self.client_id, } - return auth_request def __get_challenge_response_request(self, challenge_parameters, password): """ Create a Challenge Response Request object. @@ -253,11 +253,11 @@ class AwsIdp: :return: A valid and full request data object to use as a response for a challenge. :rtype: dict """ - user_id = challenge_parameters["USERNAME"] - user_id_for_srp = challenge_parameters["USER_ID_FOR_SRP"] - srp_b = challenge_parameters["SRP_B"] - salt = challenge_parameters["SALT"] - secret_block = challenge_parameters["SECRET_BLOCK"] + user_id = challenge_parameters['USERNAME'] + user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP'] + srp_b = challenge_parameters['SRP_B'] + salt = challenge_parameters['SALT'] + secret_block = challenge_parameters['SECRET_BLOCK'] timestamp = self.__get_current_timestamp() @@ -266,7 +266,7 @@ class AwsIdp: user_id_for_srp, password, self.__hex_to_long(srp_b), - salt + salt, ) secret_block_bytes = base64.standard_b64decode(secret_block) @@ -278,17 +278,16 @@ class AwsIdp: bytearray(timestamp, 'utf-8') hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) signature_string = base64.standard_b64encode(hmac_obj.digest()).decode('utf-8') - challenge_request = { - "ChallengeResponses": { - "USERNAME": user_id, - "TIMESTAMP": timestamp, - "PASSWORD_CLAIM_SECRET_BLOCK": secret_block, - "PASSWORD_CLAIM_SIGNATURE": signature_string + return { + 'ChallengeResponses': { + 'USERNAME': user_id, + 'TIMESTAMP': timestamp, + 'PASSWORD_CLAIM_SECRET_BLOCK': secret_block, + 'PASSWORD_CLAIM_SIGNATURE': signature_string, }, - "ChallengeName": "PASSWORD_VERIFIER", - "ClientId": self.client_id + 'ChallengeName': 'PASSWORD_VERIFIER', + 'ClientId': self.client_id, } - return challenge_request def __get_hkdf_key_for_password(self, username, password, server_b_value, salt): """ Calculates the final hkdf based on computed S value, and computed U value and the key. @@ -305,18 +304,17 @@ class AwsIdp: u_value = self.__calculate_u(self.large_a_value, server_b_value) if u_value == 0: raise ValueError('U cannot be zero.') - username_password = '%s%s:%s' % (self.pool_id.split('_')[1], username, password) - username_password_hash = self.__hash_sha256(username_password.encode('utf-8')) + username_password = '{}{}:{}'.format(self.pool_id.split('_')[1], username, password) + username_password_hash = self.__hash_sha256(username_password.encode()) x_value = self.__hex_to_long(self.__hex_hash(self.__pad_hex(salt) + username_password_hash)) g_mod_pow_xn = pow(self.g, x_value, self.big_n) int_value2 = server_b_value - self.k * g_mod_pow_xn s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n) - hkdf = self.__compute_hkdf( + return self.__compute_hkdf( bytearray.fromhex(self.__pad_hex(s_value)), - bytearray.fromhex(self.__pad_hex(self.__long_to_hex(u_value))) + bytearray.fromhex(self.__pad_hex(self.__long_to_hex(u_value))), ) - return hkdf def __compute_hkdf(self, ikm, salt): """ Standard hkdf algorithm @@ -368,7 +366,7 @@ class AwsIdp: @staticmethod def __long_to_hex(long_num): - return '%x' % long_num + return f'{long_num:x}' @staticmethod def __hex_to_long(hex_string): @@ -399,9 +397,9 @@ class AwsIdp: else: hash_str = long_int if len(hash_str) % 2 == 1: - hash_str = '0%s' % hash_str + hash_str = f'0{hash_str}' elif hash_str[0] in '89ABCDEFabcdef': - hash_str = '00%s' % hash_str + hash_str = f'00{hash_str}' return hash_str @staticmethod @@ -423,11 +421,10 @@ class AwsIdp: days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] time_now = dt.datetime.now(dt.timezone.utc) - format_string = "{} {} {} %H:%M:%S UTC %Y".format(days[time_now.weekday()], months[time_now.month], time_now.day) - time_string = time_now.strftime(format_string) - return time_string + format_string = f'{days[time_now.weekday()]} {months[time_now.month]} {time_now.day} %H:%M:%S UTC %Y' + return time_now.strftime(format_string) def __str__(self): - return "AWS IDP Client for:\nRegion: %s\nPoolId: %s\nAppId: %s" % ( - self.region, self.pool_id.split("_")[1], self.client_id + return 'AWS IDP Client for:\nRegion: {}\nPoolId: {}\nAppId: {}'.format( + self.region, self.pool_id.split('_')[1], self.client_id, ) |