summaryrefslogtreecommitdiffstats
path: root/yt_dlp/extractor/common.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--yt_dlp/extractor/common.py148
1 files changed, 79 insertions, 69 deletions
diff --git a/yt_dlp/extractor/common.py b/yt_dlp/extractor/common.py
index 1d2c443..f63bd78 100644
--- a/yt_dlp/extractor/common.py
+++ b/yt_dlp/extractor/common.py
@@ -60,7 +60,6 @@ from ..utils import (
determine_ext,
dict_get,
encode_data_uri,
- error_to_compat_str,
extract_attributes,
filter_dict,
fix_xml_ampersands,
@@ -235,7 +234,14 @@ class InfoExtractor:
'maybe' if the format may have DRM and has to be tested before download.
* extra_param_to_segment_url A query string to append to each
fragment's URL, or to update each existing query string
- with. Only applied by the native HLS/DASH downloaders.
+ with. If it is an HLS stream with an AES-128 decryption key,
+ the query paramaters will be passed to the key URI as well,
+ unless there is an `extra_param_to_key_url` given,
+ or unless an external key URI is provided via `hls_aes`.
+ Only applied by the native HLS/DASH downloaders.
+ * extra_param_to_key_url A query string to append to the URL
+ of the format's HLS AES-128 decryption key.
+ Only applied by the native HLS downloader.
* hls_aes A dictionary of HLS AES-128 decryption information
used by the native HLS downloader to override the
values in the media playlist when an '#EXT-X-KEY' tag
@@ -767,8 +773,8 @@ class InfoExtractor:
self._x_forwarded_for_ip = GeoUtils.random_ipv4(country_code)
if self._x_forwarded_for_ip:
self.report_warning(
- 'Video is geo restricted. Retrying extraction with fake IP %s (%s) as X-Forwarded-For.'
- % (self._x_forwarded_for_ip, country_code.upper()))
+ 'Video is geo restricted. Retrying extraction with fake IP '
+ f'{self._x_forwarded_for_ip} ({country_code.upper()}) as X-Forwarded-For.')
return True
return False
@@ -841,7 +847,7 @@ class InfoExtractor:
if not self._downloader._first_webpage_request:
sleep_interval = self.get_param('sleep_interval_requests') or 0
if sleep_interval > 0:
- self.to_screen('Sleeping %s seconds ...' % sleep_interval)
+ self.to_screen(f'Sleeping {sleep_interval} seconds ...')
time.sleep(sleep_interval)
else:
self._downloader._first_webpage_request = False
@@ -898,7 +904,7 @@ class InfoExtractor:
if errnote is None:
errnote = 'Unable to download webpage'
- errmsg = f'{errnote}: {error_to_compat_str(err)}'
+ errmsg = f'{errnote}: {err}'
if fatal:
raise ExtractorError(errmsg, cause=err)
else:
@@ -987,7 +993,7 @@ class InfoExtractor:
r'<iframe src="([^"]+)"', content,
'Websense information URL', default=None)
if blocked_iframe:
- msg += ' Visit %s for more details' % blocked_iframe
+ msg += f' Visit {blocked_iframe} for more details'
raise ExtractorError(msg, expected=True)
if '<title>The URL you requested has been blocked</title>' in first_block:
msg = (
@@ -997,7 +1003,7 @@ class InfoExtractor:
r'</h1><p>(.*?)</p>',
content, 'block message', default=None)
if block_msg:
- msg += ' (Message: "%s")' % block_msg.replace('\n', ' ')
+ msg += ' (Message: "{}")'.format(block_msg.replace('\n', ' '))
raise ExtractorError(msg, expected=True)
if ('<title>TTK :: Доступ к ресурсу ограничен</title>' in content
and 'blocklist.rkn.gov.ru' in content):
@@ -1012,7 +1018,7 @@ class InfoExtractor:
basen = join_nonempty(video_id, data, url, delim='_')
trim_length = self.get_param('trim_file_name') or 240
if len(basen) > trim_length:
- h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest()
+ h = '___' + hashlib.md5(basen.encode()).hexdigest()
basen = basen[:trim_length - len(h)] + h
filename = sanitize_filename(f'{basen}.dump', restricted=True)
# Working around MAX_PATH limitation on Windows (see
@@ -1063,7 +1069,7 @@ class InfoExtractor:
if transform_source:
xml_string = transform_source(xml_string)
try:
- return compat_etree_fromstring(xml_string.encode('utf-8'))
+ return compat_etree_fromstring(xml_string.encode())
except xml.etree.ElementTree.ParseError as ve:
self.__print_error('Failed to parse XML' if errnote is None else errnote, fatal, video_id, ve)
@@ -1214,11 +1220,11 @@ class InfoExtractor:
def report_extraction(self, id_or_name):
"""Report information extraction."""
- self.to_screen('%s: Extracting information' % id_or_name)
+ self.to_screen(f'{id_or_name}: Extracting information')
def report_download_webpage(self, video_id):
"""Report webpage download."""
- self.to_screen('%s: Downloading webpage' % video_id)
+ self.to_screen(f'{video_id}: Downloading webpage')
def report_age_confirmation(self):
"""Report attempt to confirm age."""
@@ -1324,9 +1330,9 @@ class InfoExtractor:
elif default is not NO_DEFAULT:
return default
elif fatal:
- raise RegexNotFoundError('Unable to extract %s' % _name)
+ raise RegexNotFoundError(f'Unable to extract {_name}')
else:
- self.report_warning('unable to extract %s' % _name + bug_reports_message())
+ self.report_warning(f'unable to extract {_name}' + bug_reports_message())
return None
def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='',
@@ -1425,14 +1431,14 @@ class InfoExtractor:
if tfa is not None:
return tfa
- return getpass.getpass('Type %s and press [Return]: ' % note)
+ return getpass.getpass(f'Type {note} and press [Return]: ')
# Helper functions for extracting OpenGraph info
@staticmethod
def _og_regexes(prop):
content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?)(?=\s|/?>))'
- property_re = (r'(?:name|property)=(?:\'og%(sep)s%(prop)s\'|"og%(sep)s%(prop)s"|\s*og%(sep)s%(prop)s\b)'
- % {'prop': re.escape(prop), 'sep': '(?:&#x3A;|[:-])'})
+ property_re = r'(?:name|property)=(?:\'og{sep}{prop}\'|"og{sep}{prop}"|\s*og{sep}{prop}\b)'.format(
+ prop=re.escape(prop), sep='(?:&#x3A;|[:-])')
template = r'<meta[^>]+?%s[^>]+?%s'
return [
template % (property_re, content_re),
@@ -1441,14 +1447,14 @@ class InfoExtractor:
@staticmethod
def _meta_regex(prop):
- return r'''(?isx)<meta
- (?=[^>]+(?:itemprop|name|property|id|http-equiv)=(["\']?)%s\1)
- [^>]+?content=(["\'])(?P<content>.*?)\2''' % re.escape(prop)
+ return rf'''(?isx)<meta
+ (?=[^>]+(?:itemprop|name|property|id|http-equiv)=(["\']?){re.escape(prop)}\1)
+ [^>]+?content=(["\'])(?P<content>.*?)\2'''
def _og_search_property(self, prop, html, name=None, **kargs):
prop = variadic(prop)
if name is None:
- name = 'OpenGraph %s' % prop[0]
+ name = f'OpenGraph {prop[0]}'
og_regexes = []
for p in prop:
og_regexes.extend(self._og_regexes(p))
@@ -1571,7 +1577,7 @@ class InfoExtractor:
elif fatal:
raise RegexNotFoundError('Unable to extract JSON-LD')
else:
- self.report_warning('unable to extract JSON-LD %s' % bug_reports_message())
+ self.report_warning(f'unable to extract JSON-LD {bug_reports_message()}')
return {}
def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
@@ -1593,8 +1599,8 @@ class InfoExtractor:
}
def is_type(e, *expected_types):
- type = variadic(traverse_obj(e, '@type'))
- return any(x in type for x in expected_types)
+ type_ = variadic(traverse_obj(e, '@type'))
+ return any(x in type_ for x in expected_types)
def extract_interaction_type(e):
interaction_type = e.get('interactionType')
@@ -1623,7 +1629,7 @@ class InfoExtractor:
count_kind = INTERACTION_TYPE_MAP.get(interaction_type.split('/')[-1])
if not count_kind:
continue
- count_key = '%s_count' % count_kind
+ count_key = f'{count_kind}_count'
if info.get(count_key) is not None:
continue
info[count_key] = interaction_count
@@ -1635,7 +1641,7 @@ class InfoExtractor:
'end_time': part.get('endOffset'),
} for part in variadic(e.get('hasPart') or []) if part.get('@type') == 'Clip']
for idx, (last_c, current_c, next_c) in enumerate(zip(
- [{'end_time': 0}] + chapters, chapters, chapters[1:])):
+ [{'end_time': 0}, *chapters], chapters, chapters[1:])):
current_c['end_time'] = current_c['end_time'] or next_c['start_time']
current_c['start_time'] = current_c['start_time'] or last_c['end_time']
if None in current_c.values():
@@ -1776,9 +1782,9 @@ class InfoExtractor:
def _hidden_inputs(html):
html = re.sub(r'<!--(?:(?!<!--).)*-->', '', html)
hidden_inputs = {}
- for input in re.findall(r'(?i)(<input[^>]+>)', html):
- attrs = extract_attributes(input)
- if not input:
+ for input_el in re.findall(r'(?i)(<input[^>]+>)', html):
+ attrs = extract_attributes(input_el)
+ if not input_el:
continue
if attrs.get('type') not in ('hidden', 'submit'):
continue
@@ -1790,8 +1796,8 @@ class InfoExtractor:
def _form_hidden_inputs(self, form_id, html):
form = self._search_regex(
- r'(?is)<form[^>]+?id=(["\'])%s\1[^>]*>(?P<form>.+?)</form>' % form_id,
- html, '%s form' % form_id, group='form')
+ rf'(?is)<form[^>]+?id=(["\']){form_id}\1[^>]*>(?P<form>.+?)</form>',
+ html, f'{form_id} form', group='form')
return self._hidden_inputs(form)
@classproperty(cache=True)
@@ -1821,7 +1827,7 @@ class InfoExtractor:
formats[:] = filter(
lambda f: self._is_valid_url(
f['url'], video_id,
- item='%s video format' % f.get('format_id') if f.get('format_id') else 'video'),
+ item='{} video format'.format(f.get('format_id')) if f.get('format_id') else 'video'),
formats)
@staticmethod
@@ -1837,15 +1843,14 @@ class InfoExtractor:
def _is_valid_url(self, url, video_id, item='video', headers={}):
url = self._proto_relative_url(url, scheme='http:')
# For now assume non HTTP(S) URLs always valid
- if not (url.startswith('http://') or url.startswith('https://')):
+ if not url.startswith(('http://', 'https://')):
return True
try:
- self._request_webpage(url, video_id, 'Checking %s URL' % item, headers=headers)
+ self._request_webpage(url, video_id, f'Checking {item} URL', headers=headers)
return True
except ExtractorError as e:
self.to_screen(
- '%s: %s URL is invalid, skipping: %s'
- % (video_id, item, error_to_compat_str(e.cause)))
+ f'{video_id}: {item} URL is invalid, skipping: {e.cause!s}')
return False
def http_scheme(self):
@@ -1899,8 +1904,8 @@ class InfoExtractor:
# currently yt-dlp cannot decode the playerVerificationChallenge as Akamai uses Adobe Alchemy
akamai_pv = manifest.find('{http://ns.adobe.com/f4m/1.0}pv-2.0')
if akamai_pv is not None and ';' in akamai_pv.text:
- playerVerificationChallenge = akamai_pv.text.split(';')[0]
- if playerVerificationChallenge.strip() != '':
+ player_verification_challenge = akamai_pv.text.split(';')[0]
+ if player_verification_challenge.strip() != '':
return []
formats = []
@@ -1946,7 +1951,7 @@ class InfoExtractor:
if not media_url:
continue
manifest_url = (
- media_url if media_url.startswith('http://') or media_url.startswith('https://')
+ media_url if media_url.startswith(('http://', 'https://'))
else ((manifest_base_url or '/'.join(manifest_url.split('/')[:-1])) + '/' + media_url))
# If media_url is itself a f4m manifest do the recursive extraction
# since bitrates in parent manifest (this one) and media_url manifest
@@ -2007,7 +2012,7 @@ class InfoExtractor:
def _report_ignoring_subs(self, name):
self.report_warning(bug_reports_message(
f'Ignoring subtitle tracks found in the {name} manifest; '
- 'if any subtitle tracks are missing,'
+ 'if any subtitle tracks are missing,',
), only_once=True)
def _extract_m3u8_formats(self, *args, **kwargs):
@@ -2098,7 +2103,7 @@ class InfoExtractor:
formats = [{
'format_id': join_nonempty(m3u8_id, idx),
'format_index': idx,
- 'url': m3u8_url or encode_data_uri(m3u8_doc.encode('utf-8'), 'application/x-mpegurl'),
+ 'url': m3u8_url or encode_data_uri(m3u8_doc.encode(), 'application/x-mpegurl'),
'ext': ext,
'protocol': entry_protocol,
'preference': preference,
@@ -2217,6 +2222,11 @@ class InfoExtractor:
'quality': quality,
'has_drm': has_drm,
}
+
+ # YouTube-specific
+ if yt_audio_content_id := last_stream_inf.get('YT-EXT-AUDIO-CONTENT-ID'):
+ f['language'] = yt_audio_content_id.split('.')[0]
+
resolution = last_stream_inf.get('RESOLUTION')
if resolution:
mobj = re.search(r'(?P<width>\d+)[xX](?P<height>\d+)', resolution)
@@ -2310,7 +2320,7 @@ class InfoExtractor:
if not c or c == '.':
out.append(c)
else:
- out.append('{%s}%s' % (namespace, c))
+ out.append(f'{{{namespace}}}{c}')
return '/'.join(out)
def _extract_smil_formats_and_subtitles(self, smil_url, video_id, fatal=True, f4m_params=None, transform_source=None):
@@ -2507,7 +2517,7 @@ class InfoExtractor:
imgs_count += 1
formats.append({
- 'format_id': 'imagestream-%d' % (imgs_count),
+ 'format_id': f'imagestream-{imgs_count}',
'url': src,
'ext': mimetype2ext(medium.get('type')),
'acodec': 'none',
@@ -2525,7 +2535,7 @@ class InfoExtractor:
def _parse_smil_subtitles(self, smil, namespace=None, subtitles_lang='en'):
urls = []
subtitles = {}
- for num, textstream in enumerate(smil.findall(self._xpath_ns('.//textstream', namespace))):
+ for textstream in smil.findall(self._xpath_ns('.//textstream', namespace)):
src = textstream.get('src')
if not src or src in urls:
continue
@@ -2656,7 +2666,7 @@ class InfoExtractor:
if subtitles and period['subtitles']:
self.report_warning(bug_reports_message(
'Found subtitles in multiple periods in the DASH manifest; '
- 'if part of the subtitles are missing,'
+ 'if part of the subtitles are missing,',
), only_once=True)
for sub_lang, sub_info in period['subtitles'].items():
@@ -2782,7 +2792,7 @@ class InfoExtractor:
elif mimetype2ext(mime_type) in ('tt', 'dfxp', 'ttml', 'xml', 'json'):
content_type = 'text'
else:
- self.report_warning('Unknown MIME type %s in DASH manifest' % mime_type)
+ self.report_warning(f'Unknown MIME type {mime_type} in DASH manifest')
continue
base_url = ''
@@ -2820,10 +2830,10 @@ class InfoExtractor:
'asr': int_or_none(representation_attrib.get('audioSamplingRate')),
'fps': int_or_none(representation_attrib.get('frameRate')),
'language': lang if lang not in ('mul', 'und', 'zxx', 'mis') else None,
- 'format_note': 'DASH %s' % content_type,
+ 'format_note': f'DASH {content_type}',
'filesize': filesize,
'container': mimetype2ext(mime_type) + '_dash',
- **codecs
+ **codecs,
}
elif content_type == 'text':
f = {
@@ -2864,8 +2874,8 @@ class InfoExtractor:
t += c
# Next, $...$ templates are translated to their
# %(...) counterparts to be used with % operator
- t = re.sub(r'\$(%s)\$' % '|'.join(identifiers), r'%(\1)d', t)
- t = re.sub(r'\$(%s)%%([^$]+)\$' % '|'.join(identifiers), r'%(\1)\2', t)
+ t = re.sub(r'\$({})\$'.format('|'.join(identifiers)), r'%(\1)d', t)
+ t = re.sub(r'\$({})%([^$]+)\$'.format('|'.join(identifiers)), r'%(\1)\2', t)
t.replace('$$', '$')
return t
@@ -2928,12 +2938,12 @@ class InfoExtractor:
'duration': float_or_none(segment_d, representation_ms_info['timescale']),
})
- for num, s in enumerate(representation_ms_info['s']):
+ for s in representation_ms_info['s']:
segment_time = s.get('t') or segment_time
segment_d = s['d']
add_segment_url()
segment_number += 1
- for r in range(s.get('r', 0)):
+ for _ in range(s.get('r', 0)):
segment_time += segment_d
add_segment_url()
segment_number += 1
@@ -2947,7 +2957,7 @@ class InfoExtractor:
timescale = representation_ms_info['timescale']
for s in representation_ms_info['s']:
duration = float_or_none(s['d'], timescale)
- for r in range(s.get('r', 0) + 1):
+ for _ in range(s.get('r', 0) + 1):
segment_uri = representation_ms_info['segment_urls'][segment_index]
fragments.append({
location_key(segment_uri): segment_uri,
@@ -3055,7 +3065,7 @@ class InfoExtractor:
fourcc = track.get('FourCC') or KNOWN_TAGS.get(track.get('AudioTag'))
# TODO: add support for WVC1 and WMAP
if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML', 'EC-3'):
- self.report_warning('%s is not a supported codec' % fourcc)
+ self.report_warning(f'{fourcc} is not a supported codec')
continue
tbr = int(track.attrib['Bitrate']) // 1000
# [1] does not mention Width and Height attributes. However,
@@ -3104,7 +3114,7 @@ class InfoExtractor:
'fourcc': fourcc,
'language': stream_language,
'codec_private_data': track.get('CodecPrivateData'),
- }
+ },
})
elif stream_type in ('video', 'audio'):
formats.append({
@@ -3186,13 +3196,13 @@ class InfoExtractor:
_MEDIA_TAG_NAME_RE = r'(?:(?:amp|dl8(?:-live)?)-)?(video|audio)'
media_tags = [(media_tag, media_tag_name, media_type, '')
for media_tag, media_tag_name, media_type
- in re.findall(r'(?s)(<(%s)[^>]*/>)' % _MEDIA_TAG_NAME_RE, webpage)]
+ in re.findall(rf'(?s)(<({_MEDIA_TAG_NAME_RE})[^>]*/>)', webpage)]
media_tags.extend(re.findall(
# We only allow video|audio followed by a whitespace or '>'.
# Allowing more characters may end up in significant slow down (see
# https://github.com/ytdl-org/youtube-dl/issues/11979,
# e.g. http://www.porntrex.com/maps/videositemap.xml).
- r'(?s)(<(?P<tag>%s)(?:\s+[^>]*)?>)(.*?)</(?P=tag)>' % _MEDIA_TAG_NAME_RE, webpage))
+ rf'(?s)(<(?P<tag>{_MEDIA_TAG_NAME_RE})(?:\s+[^>]*)?>)(.*?)</(?P=tag)>', webpage))
for media_tag, _, media_type, media_content in media_tags:
media_info = {
'formats': [],
@@ -3336,13 +3346,13 @@ class InfoExtractor:
mobj = re.search(
r'(?:(?:http|rtmp|rtsp)(?P<s>s)?:)?(?P<url>//[^?]+)', url)
url_base = mobj.group('url')
- http_base_url = '%s%s:%s' % ('http', mobj.group('s') or '', url_base)
+ http_base_url = '{}{}:{}'.format('http', mobj.group('s') or '', url_base)
formats = []
def manifest_url(manifest):
m_url = f'{http_base_url}/{manifest}'
if query:
- m_url += '?%s' % query
+ m_url += f'?{query}'
return m_url
if 'm3u8' not in skip_protocols:
@@ -3364,7 +3374,7 @@ class InfoExtractor:
video_id, fatal=False)
for rtmp_format in rtmp_formats:
rtsp_format = rtmp_format.copy()
- rtsp_format['url'] = '%s/%s' % (rtmp_format['url'], rtmp_format['play_path'])
+ rtsp_format['url'] = '{}/{}'.format(rtmp_format['url'], rtmp_format['play_path'])
del rtsp_format['play_path']
del rtsp_format['ext']
rtsp_format.update({
@@ -3431,7 +3441,7 @@ class InfoExtractor:
if not track_url:
continue
subtitles.setdefault(track.get('label') or 'en', []).append({
- 'url': self._proto_relative_url(track_url)
+ 'url': self._proto_relative_url(track_url),
})
entry = {
@@ -3510,7 +3520,7 @@ class InfoExtractor:
'tbr': int_or_none(source.get('bitrate'), scale=1000),
'filesize': int_or_none(source.get('filesize')),
'ext': ext,
- 'format_id': format_id
+ 'format_id': format_id,
}
if source_url.startswith('rtmp'):
a_format['ext'] = 'flv'
@@ -3584,7 +3594,7 @@ class InfoExtractor:
continue
cookies = cookies.encode('iso-8859-1').decode('utf-8')
cookie_value = re.search(
- r'%s=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)' % cookie, cookies)
+ rf'{cookie}=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)', cookies)
if cookie_value:
value, domain = cookie_value.groups()
self._set_cookie(domain, cookie, value)
@@ -3668,7 +3678,7 @@ class InfoExtractor:
desc += ' (**Currently broken**)' if markdown else ' (Currently broken)'
# Escape emojis. Ref: https://github.com/github/markup/issues/1153
- name = (' - **%s**' % re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME)) if markdown else cls.IE_NAME
+ name = (' - **{}**'.format(re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME))) if markdown else cls.IE_NAME
return f'{name}:{desc}' if desc else name
def extract_subtitles(self, *args, **kwargs):
@@ -3708,7 +3718,7 @@ class InfoExtractor:
self.to_screen(f'Extracted {comment_count} comments')
return {
'comments': comments,
- 'comment_count': None if interrupted else comment_count
+ 'comment_count': None if interrupted else comment_count,
}
return extractor
@@ -3812,9 +3822,9 @@ class InfoExtractor:
@staticmethod
def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None):
- all_known = all(map(
- lambda x: x is not None,
- (is_private, needs_premium, needs_subscription, needs_auth, is_unlisted)))
+ all_known = all(
+ x is not None for x in
+ (is_private, needs_premium, needs_subscription, needs_auth, is_unlisted))
return (
'private' if is_private
else 'premium_only' if needs_premium
@@ -3934,7 +3944,7 @@ class SearchInfoExtractor(InfoExtractor):
@classproperty
def _VALID_URL(cls):
- return r'%s(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)' % cls._SEARCH_KEY
+ return rf'{cls._SEARCH_KEY}(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)'
def _real_extract(self, query):
prefix, query = self._match_valid_url(query).group('prefix', 'query')