diff options
Diffstat (limited to 'yt_dlp/extractor/common.py')
-rw-r--r-- | yt_dlp/extractor/common.py | 148 |
1 files changed, 79 insertions, 69 deletions
diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py index 1d2c443..f63bd78 100644 --- a/yt_dlp/extractor/common.py +++ b/yt_dlp/extractor/common.py @@ -60,7 +60,6 @@ from ..utils import ( determine_ext, dict_get, encode_data_uri, - error_to_compat_str, extract_attributes, filter_dict, fix_xml_ampersands, @@ -235,7 +234,14 @@ class InfoExtractor: 'maybe' if the format may have DRM and has to be tested before download. * extra_param_to_segment_url A query string to append to each fragment's URL, or to update each existing query string - with. Only applied by the native HLS/DASH downloaders. + with. If it is an HLS stream with an AES-128 decryption key, + the query paramaters will be passed to the key URI as well, + unless there is an `extra_param_to_key_url` given, + or unless an external key URI is provided via `hls_aes`. + Only applied by the native HLS/DASH downloaders. + * extra_param_to_key_url A query string to append to the URL + of the format's HLS AES-128 decryption key. + Only applied by the native HLS downloader. * hls_aes A dictionary of HLS AES-128 decryption information used by the native HLS downloader to override the values in the media playlist when an '#EXT-X-KEY' tag @@ -767,8 +773,8 @@ class InfoExtractor: self._x_forwarded_for_ip = GeoUtils.random_ipv4(country_code) if self._x_forwarded_for_ip: self.report_warning( - 'Video is geo restricted. Retrying extraction with fake IP %s (%s) as X-Forwarded-For.' - % (self._x_forwarded_for_ip, country_code.upper())) + 'Video is geo restricted. Retrying extraction with fake IP ' + f'{self._x_forwarded_for_ip} ({country_code.upper()}) as X-Forwarded-For.') return True return False @@ -841,7 +847,7 @@ class InfoExtractor: if not self._downloader._first_webpage_request: sleep_interval = self.get_param('sleep_interval_requests') or 0 if sleep_interval > 0: - self.to_screen('Sleeping %s seconds ...' % sleep_interval) + self.to_screen(f'Sleeping {sleep_interval} seconds ...') time.sleep(sleep_interval) else: self._downloader._first_webpage_request = False @@ -898,7 +904,7 @@ class InfoExtractor: if errnote is None: errnote = 'Unable to download webpage' - errmsg = f'{errnote}: {error_to_compat_str(err)}' + errmsg = f'{errnote}: {err}' if fatal: raise ExtractorError(errmsg, cause=err) else: @@ -987,7 +993,7 @@ class InfoExtractor: r'<iframe src="([^"]+)"', content, 'Websense information URL', default=None) if blocked_iframe: - msg += ' Visit %s for more details' % blocked_iframe + msg += f' Visit {blocked_iframe} for more details' raise ExtractorError(msg, expected=True) if '<title>The URL you requested has been blocked</title>' in first_block: msg = ( @@ -997,7 +1003,7 @@ class InfoExtractor: r'</h1><p>(.*?)</p>', content, 'block message', default=None) if block_msg: - msg += ' (Message: "%s")' % block_msg.replace('\n', ' ') + msg += ' (Message: "{}")'.format(block_msg.replace('\n', ' ')) raise ExtractorError(msg, expected=True) if ('<title>TTK :: Доступ к ресурсу ограничен</title>' in content and 'blocklist.rkn.gov.ru' in content): @@ -1012,7 +1018,7 @@ class InfoExtractor: basen = join_nonempty(video_id, data, url, delim='_') trim_length = self.get_param('trim_file_name') or 240 if len(basen) > trim_length: - h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest() + h = '___' + hashlib.md5(basen.encode()).hexdigest() basen = basen[:trim_length - len(h)] + h filename = sanitize_filename(f'{basen}.dump', restricted=True) # Working around MAX_PATH limitation on Windows (see @@ -1063,7 +1069,7 @@ class InfoExtractor: if transform_source: xml_string = transform_source(xml_string) try: - return compat_etree_fromstring(xml_string.encode('utf-8')) + return compat_etree_fromstring(xml_string.encode()) except xml.etree.ElementTree.ParseError as ve: self.__print_error('Failed to parse XML' if errnote is None else errnote, fatal, video_id, ve) @@ -1214,11 +1220,11 @@ class InfoExtractor: def report_extraction(self, id_or_name): """Report information extraction.""" - self.to_screen('%s: Extracting information' % id_or_name) + self.to_screen(f'{id_or_name}: Extracting information') def report_download_webpage(self, video_id): """Report webpage download.""" - self.to_screen('%s: Downloading webpage' % video_id) + self.to_screen(f'{video_id}: Downloading webpage') def report_age_confirmation(self): """Report attempt to confirm age.""" @@ -1324,9 +1330,9 @@ class InfoExtractor: elif default is not NO_DEFAULT: return default elif fatal: - raise RegexNotFoundError('Unable to extract %s' % _name) + raise RegexNotFoundError(f'Unable to extract {_name}') else: - self.report_warning('unable to extract %s' % _name + bug_reports_message()) + self.report_warning(f'unable to extract {_name}' + bug_reports_message()) return None def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='', @@ -1425,14 +1431,14 @@ class InfoExtractor: if tfa is not None: return tfa - return getpass.getpass('Type %s and press [Return]: ' % note) + return getpass.getpass(f'Type {note} and press [Return]: ') # Helper functions for extracting OpenGraph info @staticmethod def _og_regexes(prop): content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?)(?=\s|/?>))' - property_re = (r'(?:name|property)=(?:\'og%(sep)s%(prop)s\'|"og%(sep)s%(prop)s"|\s*og%(sep)s%(prop)s\b)' - % {'prop': re.escape(prop), 'sep': '(?::|[:-])'}) + property_re = r'(?:name|property)=(?:\'og{sep}{prop}\'|"og{sep}{prop}"|\s*og{sep}{prop}\b)'.format( + prop=re.escape(prop), sep='(?::|[:-])') template = r'<meta[^>]+?%s[^>]+?%s' return [ template % (property_re, content_re), @@ -1441,14 +1447,14 @@ class InfoExtractor: @staticmethod def _meta_regex(prop): - return r'''(?isx)<meta - (?=[^>]+(?:itemprop|name|property|id|http-equiv)=(["\']?)%s\1) - [^>]+?content=(["\'])(?P<content>.*?)\2''' % re.escape(prop) + return rf'''(?isx)<meta + (?=[^>]+(?:itemprop|name|property|id|http-equiv)=(["\']?){re.escape(prop)}\1) + [^>]+?content=(["\'])(?P<content>.*?)\2''' def _og_search_property(self, prop, html, name=None, **kargs): prop = variadic(prop) if name is None: - name = 'OpenGraph %s' % prop[0] + name = f'OpenGraph {prop[0]}' og_regexes = [] for p in prop: og_regexes.extend(self._og_regexes(p)) @@ -1571,7 +1577,7 @@ class InfoExtractor: elif fatal: raise RegexNotFoundError('Unable to extract JSON-LD') else: - self.report_warning('unable to extract JSON-LD %s' % bug_reports_message()) + self.report_warning(f'unable to extract JSON-LD {bug_reports_message()}') return {} def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None): @@ -1593,8 +1599,8 @@ class InfoExtractor: } def is_type(e, *expected_types): - type = variadic(traverse_obj(e, '@type')) - return any(x in type for x in expected_types) + type_ = variadic(traverse_obj(e, '@type')) + return any(x in type_ for x in expected_types) def extract_interaction_type(e): interaction_type = e.get('interactionType') @@ -1623,7 +1629,7 @@ class InfoExtractor: count_kind = INTERACTION_TYPE_MAP.get(interaction_type.split('/')[-1]) if not count_kind: continue - count_key = '%s_count' % count_kind + count_key = f'{count_kind}_count' if info.get(count_key) is not None: continue info[count_key] = interaction_count @@ -1635,7 +1641,7 @@ class InfoExtractor: 'end_time': part.get('endOffset'), } for part in variadic(e.get('hasPart') or []) if part.get('@type') == 'Clip'] for idx, (last_c, current_c, next_c) in enumerate(zip( - [{'end_time': 0}] + chapters, chapters, chapters[1:])): + [{'end_time': 0}, *chapters], chapters, chapters[1:])): current_c['end_time'] = current_c['end_time'] or next_c['start_time'] current_c['start_time'] = current_c['start_time'] or last_c['end_time'] if None in current_c.values(): @@ -1776,9 +1782,9 @@ class InfoExtractor: def _hidden_inputs(html): html = re.sub(r'<!--(?:(?!<!--).)*-->', '', html) hidden_inputs = {} - for input in re.findall(r'(?i)(<input[^>]+>)', html): - attrs = extract_attributes(input) - if not input: + for input_el in re.findall(r'(?i)(<input[^>]+>)', html): + attrs = extract_attributes(input_el) + if not input_el: continue if attrs.get('type') not in ('hidden', 'submit'): continue @@ -1790,8 +1796,8 @@ class InfoExtractor: def _form_hidden_inputs(self, form_id, html): form = self._search_regex( - r'(?is)<form[^>]+?id=(["\'])%s\1[^>]*>(?P<form>.+?)</form>' % form_id, - html, '%s form' % form_id, group='form') + rf'(?is)<form[^>]+?id=(["\']){form_id}\1[^>]*>(?P<form>.+?)</form>', + html, f'{form_id} form', group='form') return self._hidden_inputs(form) @classproperty(cache=True) @@ -1821,7 +1827,7 @@ class InfoExtractor: formats[:] = filter( lambda f: self._is_valid_url( f['url'], video_id, - item='%s video format' % f.get('format_id') if f.get('format_id') else 'video'), + item='{} video format'.format(f.get('format_id')) if f.get('format_id') else 'video'), formats) @staticmethod @@ -1837,15 +1843,14 @@ class InfoExtractor: def _is_valid_url(self, url, video_id, item='video', headers={}): url = self._proto_relative_url(url, scheme='http:') # For now assume non HTTP(S) URLs always valid - if not (url.startswith('http://') or url.startswith('https://')): + if not url.startswith(('http://', 'https://')): return True try: - self._request_webpage(url, video_id, 'Checking %s URL' % item, headers=headers) + self._request_webpage(url, video_id, f'Checking {item} URL', headers=headers) return True except ExtractorError as e: self.to_screen( - '%s: %s URL is invalid, skipping: %s' - % (video_id, item, error_to_compat_str(e.cause))) + f'{video_id}: {item} URL is invalid, skipping: {e.cause!s}') return False def http_scheme(self): @@ -1899,8 +1904,8 @@ class InfoExtractor: # currently yt-dlp cannot decode the playerVerificationChallenge as Akamai uses Adobe Alchemy akamai_pv = manifest.find('{http://ns.adobe.com/f4m/1.0}pv-2.0') if akamai_pv is not None and ';' in akamai_pv.text: - playerVerificationChallenge = akamai_pv.text.split(';')[0] - if playerVerificationChallenge.strip() != '': + player_verification_challenge = akamai_pv.text.split(';')[0] + if player_verification_challenge.strip() != '': return [] formats = [] @@ -1946,7 +1951,7 @@ class InfoExtractor: if not media_url: continue manifest_url = ( - media_url if media_url.startswith('http://') or media_url.startswith('https://') + media_url if media_url.startswith(('http://', 'https://')) else ((manifest_base_url or '/'.join(manifest_url.split('/')[:-1])) + '/' + media_url)) # If media_url is itself a f4m manifest do the recursive extraction # since bitrates in parent manifest (this one) and media_url manifest @@ -2007,7 +2012,7 @@ class InfoExtractor: def _report_ignoring_subs(self, name): self.report_warning(bug_reports_message( f'Ignoring subtitle tracks found in the {name} manifest; ' - 'if any subtitle tracks are missing,' + 'if any subtitle tracks are missing,', ), only_once=True) def _extract_m3u8_formats(self, *args, **kwargs): @@ -2098,7 +2103,7 @@ class InfoExtractor: formats = [{ 'format_id': join_nonempty(m3u8_id, idx), 'format_index': idx, - 'url': m3u8_url or encode_data_uri(m3u8_doc.encode('utf-8'), 'application/x-mpegurl'), + 'url': m3u8_url or encode_data_uri(m3u8_doc.encode(), 'application/x-mpegurl'), 'ext': ext, 'protocol': entry_protocol, 'preference': preference, @@ -2217,6 +2222,11 @@ class InfoExtractor: 'quality': quality, 'has_drm': has_drm, } + + # YouTube-specific + if yt_audio_content_id := last_stream_inf.get('YT-EXT-AUDIO-CONTENT-ID'): + f['language'] = yt_audio_content_id.split('.')[0] + resolution = last_stream_inf.get('RESOLUTION') if resolution: mobj = re.search(r'(?P<width>\d+)[xX](?P<height>\d+)', resolution) @@ -2310,7 +2320,7 @@ class InfoExtractor: if not c or c == '.': out.append(c) else: - out.append('{%s}%s' % (namespace, c)) + out.append(f'{{{namespace}}}{c}') return '/'.join(out) def _extract_smil_formats_and_subtitles(self, smil_url, video_id, fatal=True, f4m_params=None, transform_source=None): @@ -2507,7 +2517,7 @@ class InfoExtractor: imgs_count += 1 formats.append({ - 'format_id': 'imagestream-%d' % (imgs_count), + 'format_id': f'imagestream-{imgs_count}', 'url': src, 'ext': mimetype2ext(medium.get('type')), 'acodec': 'none', @@ -2525,7 +2535,7 @@ class InfoExtractor: def _parse_smil_subtitles(self, smil, namespace=None, subtitles_lang='en'): urls = [] subtitles = {} - for num, textstream in enumerate(smil.findall(self._xpath_ns('.//textstream', namespace))): + for textstream in smil.findall(self._xpath_ns('.//textstream', namespace)): src = textstream.get('src') if not src or src in urls: continue @@ -2656,7 +2666,7 @@ class InfoExtractor: if subtitles and period['subtitles']: self.report_warning(bug_reports_message( 'Found subtitles in multiple periods in the DASH manifest; ' - 'if part of the subtitles are missing,' + 'if part of the subtitles are missing,', ), only_once=True) for sub_lang, sub_info in period['subtitles'].items(): @@ -2782,7 +2792,7 @@ class InfoExtractor: elif mimetype2ext(mime_type) in ('tt', 'dfxp', 'ttml', 'xml', 'json'): content_type = 'text' else: - self.report_warning('Unknown MIME type %s in DASH manifest' % mime_type) + self.report_warning(f'Unknown MIME type {mime_type} in DASH manifest') continue base_url = '' @@ -2820,10 +2830,10 @@ class InfoExtractor: 'asr': int_or_none(representation_attrib.get('audioSamplingRate')), 'fps': int_or_none(representation_attrib.get('frameRate')), 'language': lang if lang not in ('mul', 'und', 'zxx', 'mis') else None, - 'format_note': 'DASH %s' % content_type, + 'format_note': f'DASH {content_type}', 'filesize': filesize, 'container': mimetype2ext(mime_type) + '_dash', - **codecs + **codecs, } elif content_type == 'text': f = { @@ -2864,8 +2874,8 @@ class InfoExtractor: t += c # Next, $...$ templates are translated to their # %(...) counterparts to be used with % operator - t = re.sub(r'\$(%s)\$' % '|'.join(identifiers), r'%(\1)d', t) - t = re.sub(r'\$(%s)%%([^$]+)\$' % '|'.join(identifiers), r'%(\1)\2', t) + t = re.sub(r'\$({})\$'.format('|'.join(identifiers)), r'%(\1)d', t) + t = re.sub(r'\$({})%([^$]+)\$'.format('|'.join(identifiers)), r'%(\1)\2', t) t.replace('$$', '$') return t @@ -2928,12 +2938,12 @@ class InfoExtractor: 'duration': float_or_none(segment_d, representation_ms_info['timescale']), }) - for num, s in enumerate(representation_ms_info['s']): + for s in representation_ms_info['s']: segment_time = s.get('t') or segment_time segment_d = s['d'] add_segment_url() segment_number += 1 - for r in range(s.get('r', 0)): + for _ in range(s.get('r', 0)): segment_time += segment_d add_segment_url() segment_number += 1 @@ -2947,7 +2957,7 @@ class InfoExtractor: timescale = representation_ms_info['timescale'] for s in representation_ms_info['s']: duration = float_or_none(s['d'], timescale) - for r in range(s.get('r', 0) + 1): + for _ in range(s.get('r', 0) + 1): segment_uri = representation_ms_info['segment_urls'][segment_index] fragments.append({ location_key(segment_uri): segment_uri, @@ -3055,7 +3065,7 @@ class InfoExtractor: fourcc = track.get('FourCC') or KNOWN_TAGS.get(track.get('AudioTag')) # TODO: add support for WVC1 and WMAP if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML', 'EC-3'): - self.report_warning('%s is not a supported codec' % fourcc) + self.report_warning(f'{fourcc} is not a supported codec') continue tbr = int(track.attrib['Bitrate']) // 1000 # [1] does not mention Width and Height attributes. However, @@ -3104,7 +3114,7 @@ class InfoExtractor: 'fourcc': fourcc, 'language': stream_language, 'codec_private_data': track.get('CodecPrivateData'), - } + }, }) elif stream_type in ('video', 'audio'): formats.append({ @@ -3186,13 +3196,13 @@ class InfoExtractor: _MEDIA_TAG_NAME_RE = r'(?:(?:amp|dl8(?:-live)?)-)?(video|audio)' media_tags = [(media_tag, media_tag_name, media_type, '') for media_tag, media_tag_name, media_type - in re.findall(r'(?s)(<(%s)[^>]*/>)' % _MEDIA_TAG_NAME_RE, webpage)] + in re.findall(rf'(?s)(<({_MEDIA_TAG_NAME_RE})[^>]*/>)', webpage)] media_tags.extend(re.findall( # We only allow video|audio followed by a whitespace or '>'. # Allowing more characters may end up in significant slow down (see # https://github.com/ytdl-org/youtube-dl/issues/11979, # e.g. http://www.porntrex.com/maps/videositemap.xml). - r'(?s)(<(?P<tag>%s)(?:\s+[^>]*)?>)(.*?)</(?P=tag)>' % _MEDIA_TAG_NAME_RE, webpage)) + rf'(?s)(<(?P<tag>{_MEDIA_TAG_NAME_RE})(?:\s+[^>]*)?>)(.*?)</(?P=tag)>', webpage)) for media_tag, _, media_type, media_content in media_tags: media_info = { 'formats': [], @@ -3336,13 +3346,13 @@ class InfoExtractor: mobj = re.search( r'(?:(?:http|rtmp|rtsp)(?P<s>s)?:)?(?P<url>//[^?]+)', url) url_base = mobj.group('url') - http_base_url = '%s%s:%s' % ('http', mobj.group('s') or '', url_base) + http_base_url = '{}{}:{}'.format('http', mobj.group('s') or '', url_base) formats = [] def manifest_url(manifest): m_url = f'{http_base_url}/{manifest}' if query: - m_url += '?%s' % query + m_url += f'?{query}' return m_url if 'm3u8' not in skip_protocols: @@ -3364,7 +3374,7 @@ class InfoExtractor: video_id, fatal=False) for rtmp_format in rtmp_formats: rtsp_format = rtmp_format.copy() - rtsp_format['url'] = '%s/%s' % (rtmp_format['url'], rtmp_format['play_path']) + rtsp_format['url'] = '{}/{}'.format(rtmp_format['url'], rtmp_format['play_path']) del rtsp_format['play_path'] del rtsp_format['ext'] rtsp_format.update({ @@ -3431,7 +3441,7 @@ class InfoExtractor: if not track_url: continue subtitles.setdefault(track.get('label') or 'en', []).append({ - 'url': self._proto_relative_url(track_url) + 'url': self._proto_relative_url(track_url), }) entry = { @@ -3510,7 +3520,7 @@ class InfoExtractor: 'tbr': int_or_none(source.get('bitrate'), scale=1000), 'filesize': int_or_none(source.get('filesize')), 'ext': ext, - 'format_id': format_id + 'format_id': format_id, } if source_url.startswith('rtmp'): a_format['ext'] = 'flv' @@ -3584,7 +3594,7 @@ class InfoExtractor: continue cookies = cookies.encode('iso-8859-1').decode('utf-8') cookie_value = re.search( - r'%s=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)' % cookie, cookies) + rf'{cookie}=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)', cookies) if cookie_value: value, domain = cookie_value.groups() self._set_cookie(domain, cookie, value) @@ -3668,7 +3678,7 @@ class InfoExtractor: desc += ' (**Currently broken**)' if markdown else ' (Currently broken)' # Escape emojis. Ref: https://github.com/github/markup/issues/1153 - name = (' - **%s**' % re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME)) if markdown else cls.IE_NAME + name = (' - **{}**'.format(re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME))) if markdown else cls.IE_NAME return f'{name}:{desc}' if desc else name def extract_subtitles(self, *args, **kwargs): @@ -3708,7 +3718,7 @@ class InfoExtractor: self.to_screen(f'Extracted {comment_count} comments') return { 'comments': comments, - 'comment_count': None if interrupted else comment_count + 'comment_count': None if interrupted else comment_count, } return extractor @@ -3812,9 +3822,9 @@ class InfoExtractor: @staticmethod def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None): - all_known = all(map( - lambda x: x is not None, - (is_private, needs_premium, needs_subscription, needs_auth, is_unlisted))) + all_known = all( + x is not None for x in + (is_private, needs_premium, needs_subscription, needs_auth, is_unlisted)) return ( 'private' if is_private else 'premium_only' if needs_premium @@ -3934,7 +3944,7 @@ class SearchInfoExtractor(InfoExtractor): @classproperty def _VALID_URL(cls): - return r'%s(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)' % cls._SEARCH_KEY + return rf'{cls._SEARCH_KEY}(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)' def _real_extract(self, query): prefix, query = self._match_valid_url(query).group('prefix', 'query') |