summaryrefslogtreecommitdiffstats
path: root/yt_dlp/YoutubeDL.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/YoutubeDL.py')
-rw-r--r--yt_dlp/YoutubeDL.py337
1 files changed, 178 insertions, 159 deletions
diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py
index 2c6f695..e56c3ed 100644
--- a/yt_dlp/YoutubeDL.py
+++ b/yt_dlp/YoutubeDL.py
@@ -4,6 +4,7 @@ import copy
import datetime as dt
import errno
import fileinput
+import functools
import http.cookiejar
import io
import itertools
@@ -24,7 +25,7 @@ import traceback
import unicodedata
from .cache import Cache
-from .compat import functools, urllib # isort: split
+from .compat import urllib # isort: split
from .compat import compat_os_name, urllib_req_to_req
from .cookies import LenientSimpleCookie, load_cookies
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
@@ -109,7 +110,6 @@ from .utils import (
determine_protocol,
encode_compat_str,
encodeFilename,
- error_to_compat_str,
escapeHTML,
expand_path,
extract_basic_auth,
@@ -159,7 +159,7 @@ from .utils import (
write_json_file,
write_string,
)
-from .utils._utils import _YDLLogger
+from .utils._utils import _UnsafeExtensionError, _YDLLogger
from .utils.networking import (
HTTPHeaderDict,
clean_headers,
@@ -172,6 +172,20 @@ if compat_os_name == 'nt':
import ctypes
+def _catch_unsafe_extension_error(func):
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except _UnsafeExtensionError as error:
+ self.report_error(
+ f'The extracted extension ({error.extension!r}) is unusual '
+ 'and will be skipped for safety reasons. '
+ f'If you believe this is an error{bug_reports_message(",")}')
+
+ return wrapper
+
+
class YoutubeDL:
"""YoutubeDL class.
@@ -454,8 +468,9 @@ class YoutubeDL:
Set the value to 'native' to use the native downloader
compat_opts: Compatibility options. See "Differences in default behavior".
The following options do not work when used through the API:
- filename, abort-on-error, multistreams, no-live-chat, format-sort
- no-clean-infojson, no-playlist-metafiles, no-keep-subs, no-attach-info-json.
+ filename, abort-on-error, multistreams, no-live-chat,
+ format-sort, no-clean-infojson, no-playlist-metafiles,
+ no-keep-subs, no-attach-info-json, allow-unsafe-ext.
Refer __init__.py for their implementation
progress_template: Dictionary of templates for progress outputs.
Allowed keys are 'download', 'postprocess',
@@ -582,8 +597,9 @@ class YoutubeDL:
'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx', 'rows', 'columns',
'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start', 'is_dash_periods', 'request_data',
'preference', 'language', 'language_preference', 'quality', 'source_preference', 'cookies',
- 'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'extra_param_to_segment_url', 'hls_aes', 'downloader_options',
- 'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time'
+ 'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'extra_param_to_segment_url', 'extra_param_to_key_url',
+ 'hls_aes', 'downloader_options', 'page_url', 'app', 'play_path', 'tc_url', 'flash_version',
+ 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time',
}
_deprecated_multivalue_fields = {
'album_artist': 'album_artists',
@@ -594,7 +610,7 @@ class YoutubeDL:
}
_format_selection_exts = {
'audio': set(MEDIA_EXTENSIONS.common_audio),
- 'video': set(MEDIA_EXTENSIONS.common_video + ('3gp', )),
+ 'video': {*MEDIA_EXTENSIONS.common_video, '3gp'},
'storyboards': set(MEDIA_EXTENSIONS.storyboards),
}
@@ -628,7 +644,7 @@ class YoutubeDL:
error=sys.stderr,
screen=sys.stderr if self.params.get('quiet') else stdout,
console=None if compat_os_name == 'nt' else next(
- filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None)
+ filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
)
try:
@@ -679,9 +695,9 @@ class YoutubeDL:
width_args = [] if width is None else ['-w', str(width)]
sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
try:
- self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
+ self._output_process = Popen(['bidiv', *width_args], **sp_kwargs)
except OSError:
- self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
+ self._output_process = Popen(['fribidi', '-c', 'UTF-8', *width_args], **sp_kwargs)
self._output_channel = os.fdopen(master, 'rb')
except OSError as ose:
if ose.errno == errno.ENOENT:
@@ -822,8 +838,7 @@ class YoutubeDL:
)
self.report_warning(
'Long argument string detected. '
- 'Use -- to separate parameters and URLs, like this:\n%s' %
- shell_quote(correct_argv))
+ f'Use -- to separate parameters and URLs, like this:\n{shell_quote(correct_argv)}')
def add_info_extractor(self, ie):
"""Add an InfoExtractor object to the end of the list."""
@@ -922,7 +937,7 @@ class YoutubeDL:
if (self.params.get('quiet') if quiet is None else quiet) and not self.params.get('verbose'):
return
self._write_string(
- '%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
+ '{}{}'.format(self._bidi_workaround(message), ('' if skip_eol else '\n')),
self._out_files.screen, only_once=only_once)
def to_stderr(self, message, only_once=False):
@@ -1045,10 +1060,10 @@ class YoutubeDL:
return self._format_text(self._out_files.error, self._allow_colors.error, *args, **kwargs)
def report_warning(self, message, only_once=False):
- '''
+ """
Print the message to stderr, it will be prefixed with 'WARNING:'
If stderr is a tty file the 'WARNING:' will be colored
- '''
+ """
if self.params.get('logger') is not None:
self.params['logger'].warning(message)
else:
@@ -1066,14 +1081,14 @@ class YoutubeDL:
self.to_stderr(f'{self._format_err("Deprecated Feature:", self.Styles.ERROR)} {message}', True)
def report_error(self, message, *args, **kwargs):
- '''
+ """
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
- '''
+ """
self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', *args, **kwargs)
def write_debug(self, message, only_once=False):
- '''Log debug message or Print message to stderr'''
+ """Log debug message or Print message to stderr"""
if not self.params.get('verbose', False):
return
message = f'[debug] {message}'
@@ -1085,14 +1100,14 @@ class YoutubeDL:
def report_file_already_downloaded(self, file_name):
"""Report file has already been fully downloaded."""
try:
- self.to_screen('[download] %s has already been downloaded' % file_name)
+ self.to_screen(f'[download] {file_name} has already been downloaded')
except UnicodeEncodeError:
self.to_screen('[download] The file has already been downloaded')
def report_file_delete(self, file_name):
"""Report that existing file will be deleted."""
try:
- self.to_screen('Deleting existing file %s' % file_name)
+ self.to_screen(f'Deleting existing file {file_name}')
except UnicodeEncodeError:
self.to_screen('Deleting existing file')
@@ -1147,7 +1162,7 @@ class YoutubeDL:
@staticmethod
def escape_outtmpl(outtmpl):
- ''' Escape any remaining strings like %s, %abc% etc. '''
+ """ Escape any remaining strings like %s, %abc% etc. """
return re.sub(
STR_FORMAT_RE_TMPL.format('', '(?![%(\0])'),
lambda mobj: ('' if mobj.group('has_key') else '%') + mobj.group(0),
@@ -1155,7 +1170,7 @@ class YoutubeDL:
@classmethod
def validate_outtmpl(cls, outtmpl):
- ''' @return None or Exception object '''
+ """ @return None or Exception object """
outtmpl = re.sub(
STR_FORMAT_RE_TMPL.format('[^)]*', '[ljhqBUDS]'),
lambda mobj: f'{mobj.group(0)[:-1]}s',
@@ -1208,13 +1223,13 @@ class YoutubeDL:
}
# Field is of the form key1.key2...
# where keys (except first) can be string, int, slice or "{field, ...}"
- FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'}
- FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % {
+ FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'} # noqa: UP031
+ FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % { # noqa: UP031
'inner': FIELD_INNER_RE,
- 'field': rf'\w*(?:\.{FIELD_INNER_RE})*'
+ 'field': rf'\w*(?:\.{FIELD_INNER_RE})*',
}
MATH_FIELD_RE = rf'(?:{FIELD_RE}|-?{NUMBER_RE})'
- MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
+ MATH_OPERATORS_RE = r'(?:{})'.format('|'.join(map(re.escape, MATH_FUNCTIONS.keys())))
INTERNAL_FORMAT_RE = re.compile(rf'''(?xs)
(?P<negate>-)?
(?P<fields>{FIELD_RE})
@@ -1337,7 +1352,7 @@ class YoutubeDL:
value, default = None, na
fmt = outer_mobj.group('format')
- if fmt == 's' and last_field in field_size_compat_map.keys() and isinstance(value, int):
+ if fmt == 's' and last_field in field_size_compat_map and isinstance(value, int):
fmt = f'0{field_size_compat_map[last_field]:d}d'
flags = outer_mobj.group('conversion') or ''
@@ -1362,7 +1377,7 @@ class YoutubeDL:
elif fmt[-1] == 'U': # unicode normalized
value, fmt = unicodedata.normalize(
# "+" = compatibility equivalence, "#" = NFD
- 'NF%s%s' % ('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
+ 'NF{}{}'.format('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
value), str_fmt
elif fmt[-1] == 'D': # decimal suffix
num_fmt, fmt = fmt[:-1].replace('#', ''), 's'
@@ -1390,7 +1405,7 @@ class YoutubeDL:
if fmt[-1] in 'csra':
value = sanitizer(last_field, value)
- key = '%s\0%s' % (key.replace('%', '%\0'), outer_mobj.group('format'))
+ key = '{}\0{}'.format(key.replace('%', '%\0'), outer_mobj.group('format'))
TMPL_DICT[key] = value
return '{prefix}%({key}){fmt}'.format(key=key, fmt=fmt, prefix=outer_mobj.group('prefix'))
@@ -1400,6 +1415,7 @@ class YoutubeDL:
outtmpl, info_dict = self.prepare_outtmpl(outtmpl, info_dict, *args, **kwargs)
return self.escape_outtmpl(outtmpl) % info_dict
+ @_catch_unsafe_extension_error
def _prepare_filename(self, info_dict, *, outtmpl=None, tmpl_type=None):
assert None in (outtmpl, tmpl_type), 'outtmpl and tmpl_type are mutually exclusive'
if outtmpl is None:
@@ -1479,9 +1495,9 @@ class YoutubeDL:
date = info_dict.get('upload_date')
if date is not None:
- dateRange = self.params.get('daterange', DateRange())
- if date not in dateRange:
- return f'{date_from_str(date).isoformat()} upload date is not in range {dateRange}'
+ date_range = self.params.get('daterange', DateRange())
+ if date not in date_range:
+ return f'{date_from_str(date).isoformat()} upload date is not in range {date_range}'
view_count = info_dict.get('view_count')
if view_count is not None:
min_views = self.params.get('min_views')
@@ -1491,7 +1507,7 @@ class YoutubeDL:
if max_views is not None and view_count > max_views:
return 'Skipping %s, because it has exceeded the maximum view count (%d/%d)' % (video_title, view_count, max_views)
if age_restricted(info_dict.get('age_limit'), self.params.get('age_limit')):
- return 'Skipping "%s" because it is age restricted' % video_title
+ return f'Skipping "{video_title}" because it is age restricted'
match_filter = self.params.get('match_filter')
if match_filter is None:
@@ -1544,7 +1560,7 @@ class YoutubeDL:
@staticmethod
def add_extra_info(info_dict, extra_info):
- '''Set the keys from extra_info in info dict if they are missing'''
+ """Set the keys from extra_info in info dict if they are missing"""
for key, value in extra_info.items():
info_dict.setdefault(key, value)
@@ -1590,7 +1606,7 @@ class YoutubeDL:
self.to_screen(f'[download] {self._format_screen(temp_id, self.Styles.ID)}: '
'has already been recorded in the archive')
if self.params.get('break_on_existing', False):
- raise ExistingVideoReached()
+ raise ExistingVideoReached
break
return self.__extract_info(url, self.get_info_extractor(key), download, extra_info, process)
else:
@@ -1616,8 +1632,8 @@ class YoutubeDL:
except GeoRestrictedError as e:
msg = e.msg
if e.countries:
- msg += '\nThis video is available in %s.' % ', '.join(
- map(ISO3166Utils.short2full, e.countries))
+ msg += '\nThis video is available in {}.'.format(', '.join(
+ map(ISO3166Utils.short2full, e.countries)))
msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
self.report_error(msg)
except ExtractorError as e: # An error we somewhat expected
@@ -1826,8 +1842,8 @@ class YoutubeDL:
if isinstance(additional_urls, str):
additional_urls = [additional_urls]
self.to_screen(
- '[info] %s: %d additional URL(s) requested' % (ie_result['id'], len(additional_urls)))
- self.write_debug('Additional URLs: "%s"' % '", "'.join(additional_urls))
+ '[info] {}: {} additional URL(s) requested'.format(ie_result['id'], len(additional_urls)))
+ self.write_debug('Additional URLs: "{}"'.format('", "'.join(additional_urls)))
ie_result['additional_entries'] = [
self.extract_info(
url, download, extra_info=extra_info,
@@ -1879,8 +1895,8 @@ class YoutubeDL:
webpage_url = ie_result.get('webpage_url') # Playlists maynot have webpage_url
if webpage_url and webpage_url in self._playlist_urls:
self.to_screen(
- '[download] Skipping already downloaded playlist: %s'
- % ie_result.get('title') or ie_result.get('id'))
+ '[download] Skipping already downloaded playlist: {}'.format(
+ ie_result.get('title')) or ie_result.get('id'))
return
self._playlist_level += 1
@@ -1895,8 +1911,8 @@ class YoutubeDL:
self._playlist_urls.clear()
elif result_type == 'compat_list':
self.report_warning(
- 'Extractor %s returned a compat_list result. '
- 'It needs to be updated.' % ie_result.get('extractor'))
+ 'Extractor {} returned a compat_list result. '
+ 'It needs to be updated.'.format(ie_result.get('extractor')))
def _fixup(r):
self.add_extra_info(r, {
@@ -1913,7 +1929,7 @@ class YoutubeDL:
]
return ie_result
else:
- raise Exception('Invalid result type: %s' % result_type)
+ raise Exception(f'Invalid result type: {result_type}')
def _ensure_dir_exists(self, path):
return make_dir(path, self.report_error)
@@ -1927,6 +1943,8 @@ class YoutubeDL:
'playlist_title': ie_result.get('title'),
'playlist_uploader': ie_result.get('uploader'),
'playlist_uploader_id': ie_result.get('uploader_id'),
+ 'playlist_channel': ie_result.get('channel'),
+ 'playlist_channel_id': ie_result.get('channel_id'),
**kwargs,
}
if strict:
@@ -2029,8 +2047,9 @@ class YoutubeDL:
resolved_entries[i] = (playlist_index, NO_DEFAULT)
continue
- self.to_screen('[download] Downloading item %s of %s' % (
- self._format_screen(i + 1, self.Styles.ID), self._format_screen(n_entries, self.Styles.EMPHASIS)))
+ self.to_screen(
+ f'[download] Downloading item {self._format_screen(i + 1, self.Styles.ID)} '
+ f'of {self._format_screen(n_entries, self.Styles.EMPHASIS)}')
entry_result = self.__process_iterable_entry(entry, download, collections.ChainMap({
'playlist_index': playlist_index,
@@ -2080,9 +2099,9 @@ class YoutubeDL:
}
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[\w.-]+)\s*
- (?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ (?P<op>{})(?P<none_inclusive>\s*\?)?\s*
(?P<value>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)\s*
- ''' % '|'.join(map(re.escape, OPERATORS.keys())))
+ '''.format('|'.join(map(re.escape, OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_spec)
if m:
try:
@@ -2093,7 +2112,7 @@ class YoutubeDL:
comparison_value = parse_filesize(m.group('value') + 'B')
if comparison_value is None:
raise ValueError(
- 'Invalid value %r in format specification %r' % (
+ 'Invalid value {!r} in format specification {!r}'.format(
m.group('value'), filter_spec))
op = OPERATORS[m.group('op')]
@@ -2103,15 +2122,15 @@ class YoutubeDL:
'^=': lambda attr, value: attr.startswith(value),
'$=': lambda attr, value: attr.endswith(value),
'*=': lambda attr, value: value in attr,
- '~=': lambda attr, value: value.search(attr) is not None
+ '~=': lambda attr, value: value.search(attr) is not None,
}
str_operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-zA-Z0-9._-]+)\s*
- (?P<negation>!\s*)?(?P<op>%s)\s*(?P<none_inclusive>\?\s*)?
+ (?P<negation>!\s*)?(?P<op>{})\s*(?P<none_inclusive>\?\s*)?
(?P<quote>["'])?
(?P<value>(?(quote)(?:(?!(?P=quote))[^\\]|\\.)+|[\w.-]+))
(?(quote)(?P=quote))\s*
- ''' % '|'.join(map(re.escape, STR_OPERATORS.keys())))
+ '''.format('|'.join(map(re.escape, STR_OPERATORS.keys()))))
m = str_operator_rex.fullmatch(filter_spec)
if m:
if m.group('op') == '~=':
@@ -2125,7 +2144,7 @@ class YoutubeDL:
op = str_op
if not m:
- raise SyntaxError('Invalid filter specification %r' % filter_spec)
+ raise SyntaxError(f'Invalid filter specification {filter_spec!r}')
def _filter(f):
actual_value = f.get(m.group('key'))
@@ -2141,7 +2160,7 @@ class YoutubeDL:
if working:
yield f
continue
- self.to_screen('[info] Testing format %s' % f['format_id'])
+ self.to_screen('[info] Testing format {}'.format(f['format_id']))
path = self.get_output_path('temp')
if not self._ensure_dir_exists(f'{path}/'):
continue
@@ -2149,19 +2168,19 @@ class YoutubeDL:
temp_file.close()
try:
success, _ = self.dl(temp_file.name, f, test=True)
- except (DownloadError, OSError, ValueError) + network_exceptions:
+ except (DownloadError, OSError, ValueError, *network_exceptions):
success = False
finally:
if os.path.exists(temp_file.name):
try:
os.remove(temp_file.name)
except OSError:
- self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
+ self.report_warning(f'Unable to delete temporary file "{temp_file.name}"')
f['__working'] = success
if success:
yield f
else:
- self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
+ self.to_screen('[info] Unable to download format {}. Skipping...'.format(f['format_id']))
def _select_formats(self, formats, selector):
return list(selector({
@@ -2214,8 +2233,8 @@ class YoutubeDL:
def _parse_filter(tokens):
filter_parts = []
- for type, string_, start, _, _ in tokens:
- if type == tokenize.OP and string_ == ']':
+ for type_, string_, _start, _, _ in tokens:
+ if type_ == tokenize.OP and string_ == ']':
return ''.join(filter_parts)
else:
filter_parts.append(string_)
@@ -2225,23 +2244,23 @@ class YoutubeDL:
# E.g. 'mp4' '-' 'baseline' '-' '16x9' is converted to 'mp4-baseline-16x9'
ALLOWED_OPS = ('/', '+', ',', '(', ')')
last_string, last_start, last_end, last_line = None, None, None, None
- for type, string_, start, end, line in tokens:
- if type == tokenize.OP and string_ == '[':
+ for type_, string_, start, end, line in tokens:
+ if type_ == tokenize.OP and string_ == '[':
if last_string:
yield tokenize.NAME, last_string, last_start, last_end, last_line
last_string = None
- yield type, string_, start, end, line
+ yield type_, string_, start, end, line
# everything inside brackets will be handled by _parse_filter
- for type, string_, start, end, line in tokens:
- yield type, string_, start, end, line
- if type == tokenize.OP and string_ == ']':
+ for type_, string_, start, end, line in tokens:
+ yield type_, string_, start, end, line
+ if type_ == tokenize.OP and string_ == ']':
break
- elif type == tokenize.OP and string_ in ALLOWED_OPS:
+ elif type_ == tokenize.OP and string_ in ALLOWED_OPS:
if last_string:
yield tokenize.NAME, last_string, last_start, last_end, last_line
last_string = None
- yield type, string_, start, end, line
- elif type in [tokenize.NAME, tokenize.NUMBER, tokenize.OP]:
+ yield type_, string_, start, end, line
+ elif type_ in [tokenize.NAME, tokenize.NUMBER, tokenize.OP]:
if not last_string:
last_string = string_
last_start = start
@@ -2254,13 +2273,13 @@ class YoutubeDL:
def _parse_format_selection(tokens, inside_merge=False, inside_choice=False, inside_group=False):
selectors = []
current_selector = None
- for type, string_, start, _, _ in tokens:
+ for type_, string_, start, _, _ in tokens:
# ENCODING is only defined in Python 3.x
- if type == getattr(tokenize, 'ENCODING', None):
+ if type_ == getattr(tokenize, 'ENCODING', None):
continue
- elif type in [tokenize.NAME, tokenize.NUMBER]:
+ elif type_ in [tokenize.NAME, tokenize.NUMBER]:
current_selector = FormatSelector(SINGLE, string_, [])
- elif type == tokenize.OP:
+ elif type_ == tokenize.OP:
if string_ == ')':
if not inside_group:
# ')' will be handled by the parentheses group
@@ -2303,7 +2322,7 @@ class YoutubeDL:
current_selector = FormatSelector(MERGE, (selector_1, selector_2), [])
else:
raise syntax_error(f'Operator not recognized: "{string_}"', start)
- elif type == tokenize.ENDMARKER:
+ elif type_ == tokenize.ENDMARKER:
break
if current_selector:
selectors.append(current_selector)
@@ -2378,7 +2397,7 @@ class YoutubeDL:
'acodec': the_only_audio.get('acodec'),
'abr': the_only_audio.get('abr'),
'asr': the_only_audio.get('asr'),
- 'audio_channels': the_only_audio.get('audio_channels')
+ 'audio_channels': the_only_audio.get('audio_channels'),
})
return new_dict
@@ -2459,9 +2478,9 @@ class YoutubeDL:
format_fallback = not format_type and not format_modified # for b, w
_filter_f = (
- (lambda f: f.get('%scodec' % format_type) != 'none')
+ (lambda f: f.get(f'{format_type}codec') != 'none')
if format_type and format_modified # bv*, ba*, wv*, wa*
- else (lambda f: f.get('%scodec' % not_format_type) == 'none')
+ else (lambda f: f.get(f'{not_format_type}codec') == 'none')
if format_type # bv, ba, wv, wa
else (lambda f: f.get('vcodec') != 'none' and f.get('acodec') != 'none')
if not format_modified # b, w
@@ -2529,7 +2548,7 @@ class YoutubeDL:
def __next__(self):
if self.counter >= len(self.tokens):
- raise StopIteration()
+ raise StopIteration
value = self.tokens[self.counter]
self.counter += 1
return value
@@ -2612,7 +2631,7 @@ class YoutubeDL:
self._sort_thumbnails(thumbnails)
for i, t in enumerate(thumbnails):
if t.get('id') is None:
- t['id'] = '%d' % i
+ t['id'] = str(i)
if t.get('width') and t.get('height'):
t['resolution'] = '%dx%d' % (t['width'], t['height'])
t['url'] = sanitize_url(t['url'])
@@ -2673,8 +2692,8 @@ class YoutubeDL:
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
for field in ('chapter', 'season', 'episode'):
- if final and info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
- info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+ if final and info_dict.get(f'{field}_number') is not None and not info_dict.get(field):
+ info_dict[field] = '%s %d' % (field.capitalize(), info_dict[f'{field}_number'])
for old_key, new_key in self._deprecated_multivalue_fields.items():
if new_key in info_dict and old_key in info_dict:
@@ -2706,8 +2725,8 @@ class YoutubeDL:
def report_force_conversion(field, field_not, conversion):
self.report_warning(
- '"%s" field is not %s - forcing %s conversion, there is an error in extractor'
- % (field, field_not, conversion))
+ f'"{field}" field is not {field_not} - forcing {conversion} conversion, '
+ 'there is an error in extractor')
def sanitize_string_field(info, string_field):
field = info.get(string_field)
@@ -2824,28 +2843,28 @@ class YoutubeDL:
if not formats:
self.raise_no_formats(info_dict)
- for format in formats:
- sanitize_string_field(format, 'format_id')
- sanitize_numeric_fields(format)
- format['url'] = sanitize_url(format['url'])
- if format.get('ext') is None:
- format['ext'] = determine_ext(format['url']).lower()
- if format['ext'] in ('aac', 'opus', 'mp3', 'flac', 'vorbis'):
- if format.get('acodec') is None:
- format['acodec'] = format['ext']
- if format.get('protocol') is None:
- format['protocol'] = determine_protocol(format)
- if format.get('resolution') is None:
- format['resolution'] = self.format_resolution(format, default=None)
- if format.get('dynamic_range') is None and format.get('vcodec') != 'none':
- format['dynamic_range'] = 'SDR'
- if format.get('aspect_ratio') is None:
- format['aspect_ratio'] = try_call(lambda: round(format['width'] / format['height'], 2))
+ for fmt in formats:
+ sanitize_string_field(fmt, 'format_id')
+ sanitize_numeric_fields(fmt)
+ fmt['url'] = sanitize_url(fmt['url'])
+ if fmt.get('ext') is None:
+ fmt['ext'] = determine_ext(fmt['url']).lower()
+ if fmt['ext'] in ('aac', 'opus', 'mp3', 'flac', 'vorbis'):
+ if fmt.get('acodec') is None:
+ fmt['acodec'] = fmt['ext']
+ if fmt.get('protocol') is None:
+ fmt['protocol'] = determine_protocol(fmt)
+ if fmt.get('resolution') is None:
+ fmt['resolution'] = self.format_resolution(fmt, default=None)
+ if fmt.get('dynamic_range') is None and fmt.get('vcodec') != 'none':
+ fmt['dynamic_range'] = 'SDR'
+ if fmt.get('aspect_ratio') is None:
+ fmt['aspect_ratio'] = try_call(lambda: round(fmt['width'] / fmt['height'], 2))
# For fragmented formats, "tbr" is often max bitrate and not average
- if (('manifest-filesize-approx' in self.params['compat_opts'] or not format.get('manifest_url'))
- and not format.get('filesize') and not format.get('filesize_approx')):
- format['filesize_approx'] = filesize_from_tbr(format.get('tbr'), info_dict.get('duration'))
- format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)
+ if (('manifest-filesize-approx' in self.params['compat_opts'] or not fmt.get('manifest_url'))
+ and not fmt.get('filesize') and not fmt.get('filesize_approx')):
+ fmt['filesize_approx'] = filesize_from_tbr(fmt.get('tbr'), info_dict.get('duration'))
+ fmt['http_headers'] = self._calc_headers(collections.ChainMap(fmt, info_dict), load_cookies=True)
# Safeguard against old/insecure infojson when using --load-info-json
if info_dict.get('http_headers'):
@@ -2858,36 +2877,36 @@ class YoutubeDL:
self.sort_formats({
'formats': formats,
- '_format_sort_fields': info_dict.get('_format_sort_fields')
+ '_format_sort_fields': info_dict.get('_format_sort_fields'),
})
# Sanitize and group by format_id
formats_dict = {}
- for i, format in enumerate(formats):
- if not format.get('format_id'):
- format['format_id'] = str(i)
+ for i, fmt in enumerate(formats):
+ if not fmt.get('format_id'):
+ fmt['format_id'] = str(i)
else:
# Sanitize format_id from characters used in format selector expression
- format['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', format['format_id'])
- formats_dict.setdefault(format['format_id'], []).append(format)
+ fmt['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', fmt['format_id'])
+ formats_dict.setdefault(fmt['format_id'], []).append(fmt)
# Make sure all formats have unique format_id
common_exts = set(itertools.chain(*self._format_selection_exts.values()))
for format_id, ambiguous_formats in formats_dict.items():
ambigious_id = len(ambiguous_formats) > 1
- for i, format in enumerate(ambiguous_formats):
+ for i, fmt in enumerate(ambiguous_formats):
if ambigious_id:
- format['format_id'] = '%s-%d' % (format_id, i)
+ fmt['format_id'] = f'{format_id}-{i}'
# Ensure there is no conflict between id and ext in format selection
# See https://github.com/yt-dlp/yt-dlp/issues/1282
- if format['format_id'] != format['ext'] and format['format_id'] in common_exts:
- format['format_id'] = 'f%s' % format['format_id']
-
- if format.get('format') is None:
- format['format'] = '{id} - {res}{note}'.format(
- id=format['format_id'],
- res=self.format_resolution(format),
- note=format_field(format, 'format_note', ' (%s)'),
+ if fmt['format_id'] != fmt['ext'] and fmt['format_id'] in common_exts:
+ fmt['format_id'] = 'f{}'.format(fmt['format_id'])
+
+ if fmt.get('format') is None:
+ fmt['format'] = '{id} - {res}{note}'.format(
+ id=fmt['format_id'],
+ res=self.format_resolution(fmt),
+ note=format_field(fmt, 'format_note', ' (%s)'),
)
if self.params.get('check_formats') is True:
@@ -3009,7 +3028,7 @@ class YoutubeDL:
info_dict['requested_downloads'] = downloaded_formats
info_dict = self.run_all_pps('after_video', info_dict)
if max_downloads_reached:
- raise MaxDownloadsReached()
+ raise MaxDownloadsReached
# We update the info dict with the selected best quality format (backwards compatibility)
info_dict.update(best_format)
@@ -3070,8 +3089,8 @@ class YoutubeDL:
else:
f = formats[-1]
self.report_warning(
- 'No subtitle format found matching "%s" for language %s, '
- 'using %s. Use --list-subs for a list of available subtitles' % (formats_query, lang, f['ext']))
+ 'No subtitle format found matching "{}" for language {}, '
+ 'using {}. Use --list-subs for a list of available subtitles'.format(formats_query, lang, f['ext']))
subs[lang] = f
return subs
@@ -3189,6 +3208,7 @@ class YoutubeDL:
os.remove(file)
return None
+ @_catch_unsafe_extension_error
def process_info(self, info_dict):
"""Process a single resolved IE result. (Modifies it in-place)"""
@@ -3226,7 +3246,7 @@ class YoutubeDL:
def check_max_downloads():
if self._num_downloads >= float(self.params.get('max_downloads') or 'inf'):
- raise MaxDownloadsReached()
+ raise MaxDownloadsReached
if self.params.get('simulate'):
info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
@@ -3400,7 +3420,7 @@ class YoutubeDL:
for f in info_dict['requested_formats'] if fd != FFmpegFD else []:
f['filepath'] = fname = prepend_extension(
correct_ext(temp_filename, info_dict['ext']),
- 'f%s' % f['format_id'], info_dict['ext'])
+ 'f{}'.format(f['format_id']), info_dict['ext'])
downloaded.append(fname)
info_dict['url'] = '\n'.join(f['url'] for f in info_dict['requested_formats'])
success, real_download = self.dl(temp_filename, info_dict)
@@ -3433,7 +3453,7 @@ class YoutubeDL:
if temp_filename != '-':
fname = prepend_extension(
correct_ext(temp_filename, new_info['ext']),
- 'f%s' % f['format_id'], new_info['ext'])
+ 'f{}'.format(f['format_id']), new_info['ext'])
if not self._ensure_dir_exists(fname):
return
f['filepath'] = fname
@@ -3465,11 +3485,11 @@ class YoutubeDL:
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
except network_exceptions as err:
- self.report_error('unable to download video data: %s' % error_to_compat_str(err))
+ self.report_error(f'unable to download video data: {err}')
return
except OSError as err:
raise UnavailableVideoError(err)
- except (ContentTooShortError, ) as err:
+ except ContentTooShortError as err:
self.report_error(f'content too short (expected {err.expected} bytes and served {err.downloaded})')
return
@@ -3536,13 +3556,13 @@ class YoutubeDL:
try:
replace_info_dict(self.post_process(dl_filename, info_dict, files_to_move))
except PostProcessingError as err:
- self.report_error('Postprocessing: %s' % str(err))
+ self.report_error(f'Postprocessing: {err}')
return
try:
for ph in self._post_hooks:
ph(info_dict['filepath'])
except Exception as err:
- self.report_error('post hooks: %s' % str(err))
+ self.report_error(f'post hooks: {err}')
return
info_dict['__write_download_archive'] = True
@@ -3609,7 +3629,7 @@ class YoutubeDL:
@staticmethod
def sanitize_info(info_dict, remove_private_keys=False):
- ''' Sanitize the infodict for converting to json '''
+ """ Sanitize the infodict for converting to json """
if info_dict is None:
return info_dict
info_dict.setdefault('epoch', int(time.time()))
@@ -3644,7 +3664,7 @@ class YoutubeDL:
@staticmethod
def filter_requested_info(info_dict, actually_filter=True):
- ''' Alias of sanitize_info for backward compatibility '''
+ """ Alias of sanitize_info for backward compatibility """
return YoutubeDL.sanitize_info(info_dict, actually_filter)
def _delete_downloaded_files(self, *files_to_delete, info={}, msg=None):
@@ -3666,7 +3686,7 @@ class YoutubeDL:
actual_post_extract(video_dict or {})
return
- post_extractor = info_dict.pop('__post_extractor', None) or (lambda: {})
+ post_extractor = info_dict.pop('__post_extractor', None) or dict
info_dict.update(post_extractor())
actual_post_extract(info_dict or {})
@@ -3771,7 +3791,7 @@ class YoutubeDL:
if format.get('width') and format.get('height'):
return '%dx%d' % (format['width'], format['height'])
elif format.get('height'):
- return '%sp' % format['height']
+ return '{}p'.format(format['height'])
elif format.get('width'):
return '%dx?' % format['width']
return default
@@ -3788,7 +3808,7 @@ class YoutubeDL:
if fdict.get('language'):
if res:
res += ' '
- res += '[%s]' % fdict['language']
+ res += '[{}]'.format(fdict['language'])
if fdict.get('format_note') is not None:
if res:
res += ' '
@@ -3800,7 +3820,7 @@ class YoutubeDL:
if fdict.get('container') is not None:
if res:
res += ', '
- res += '%s container' % fdict['container']
+ res += '{} container'.format(fdict['container'])
if (fdict.get('vcodec') is not None
and fdict.get('vcodec') != 'none'):
if res:
@@ -3815,7 +3835,7 @@ class YoutubeDL:
if fdict.get('fps') is not None:
if res:
res += ', '
- res += '%sfps' % fdict['fps']
+ res += '{}fps'.format(fdict['fps'])
if fdict.get('acodec') is not None:
if res:
res += ', '
@@ -3858,7 +3878,7 @@ class YoutubeDL:
format_field(f, 'format_id'),
format_field(f, 'ext'),
self.format_resolution(f),
- self._format_note(f)
+ self._format_note(f),
] for f in formats if (f.get('preference') or 0) >= -1000]
return render_table(['format code', 'extension', 'resolution', 'note'], table, extra_gap=1)
@@ -3964,11 +3984,11 @@ class YoutubeDL:
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import (
_PLUGIN_CLASSES as plugin_ies,
- _PLUGIN_OVERRIDES as plugin_ie_overrides
+ _PLUGIN_OVERRIDES as plugin_ie_overrides,
)
def get_encoding(stream):
- ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
+ ret = str(getattr(stream, 'encoding', f'missing ({type(stream).__name__})'))
additional_info = []
if os.environ.get('TERM', '').lower() == 'dumb':
additional_info.append('dumb')
@@ -3979,13 +3999,13 @@ class YoutubeDL:
ret = f'{ret} ({",".join(additional_info)})'
return ret
- encoding_str = 'Encodings: locale %s, fs %s, pref %s, %s' % (
+ encoding_str = 'Encodings: locale {}, fs {}, pref {}, {}'.format(
locale.getpreferredencoding(),
sys.getfilesystemencoding(),
self.get_encoding(),
', '.join(
f'{key} {get_encoding(stream)}' for key, stream in self._out_files.items_
- if stream is not None and key != 'console')
+ if stream is not None and key != 'console'),
)
logger = self.params.get('logger')
@@ -4017,7 +4037,7 @@ class YoutubeDL:
else:
write_debug('Lazy loading extractors is disabled')
if self.params['compat_opts']:
- write_debug('Compatibility options: %s' % ', '.join(self.params['compat_opts']))
+ write_debug('Compatibility options: {}'.format(', '.join(self.params['compat_opts'])))
if current_git_head():
write_debug(f'Git HEAD: {current_git_head()}')
@@ -4026,14 +4046,14 @@ class YoutubeDL:
exe_versions, ffmpeg_features = FFmpegPostProcessor.get_versions_and_features(self)
ffmpeg_features = {key for key, val in ffmpeg_features.items() if val}
if ffmpeg_features:
- exe_versions['ffmpeg'] += ' (%s)' % ','.join(sorted(ffmpeg_features))
+ exe_versions['ffmpeg'] += ' ({})'.format(','.join(sorted(ffmpeg_features)))
exe_versions['rtmpdump'] = rtmpdump_version()
exe_versions['phantomjs'] = PhantomJSwrapper._version()
exe_str = ', '.join(
f'{exe} {v}' for exe, v in sorted(exe_versions.items()) if v
) or 'none'
- write_debug('exe versions: %s' % exe_str)
+ write_debug(f'exe versions: {exe_str}')
from .compat.compat_utils import get_package_info
from .dependencies import available_dependencies
@@ -4045,7 +4065,7 @@ class YoutubeDL:
write_debug(f'Proxy map: {self.proxies}')
write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}')
for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
- display_list = ['%s%s' % (
+ display_list = ['{}{}'.format(
klass.__name__, '' if klass.__name__ == name else f' as {name}')
for name, klass in plugins.items()]
if plugin_type == 'Extractor':
@@ -4062,14 +4082,13 @@ class YoutubeDL:
# Not implemented
if False and self.params.get('call_home'):
ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode()
- write_debug('Public IP address: %s' % ipaddr)
+ write_debug(f'Public IP address: {ipaddr}')
latest_version = self.urlopen(
'https://yt-dl.org/latest/version').read().decode()
if version_tuple(latest_version) > version_tuple(__version__):
self.report_warning(
- 'You are using an outdated version (newest version: %s)! '
- 'See https://yt-dl.org/update if you need help updating.' %
- latest_version)
+ f'You are using an outdated version (newest version: {latest_version})! '
+ 'See https://yt-dl.org/update if you need help updating.')
@functools.cached_property
def proxies(self):
@@ -4103,7 +4122,7 @@ class YoutubeDL:
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def _get_available_impersonate_targets(self):
- # todo(future): make available as public API
+ # TODO(future): make available as public API
return [
(target, rh.RH_NAME)
for rh in self._request_director.handlers.values()
@@ -4112,7 +4131,7 @@ class YoutubeDL:
]
def _impersonate_target_available(self, target):
- # todo(future): make available as public API
+ # TODO(future): make available as public API
return any(
rh.is_supported_target(target)
for rh in self._request_director.handlers.values()
@@ -4238,7 +4257,7 @@ class YoutubeDL:
return encoding
def _write_info_json(self, label, ie_result, infofn, overwrite=None):
- ''' Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error '''
+ """ Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error """
if overwrite is None:
overwrite = self.params.get('overwrites', True)
if not self.params.get('writeinfojson'):
@@ -4261,7 +4280,7 @@ class YoutubeDL:
return None
def _write_description(self, label, ie_result, descfn):
- ''' Write description and returns True = written, False = skip, None = error '''
+ """ Write description and returns True = written, False = skip, None = error """
if not self.params.get('writedescription'):
return False
elif not descfn:
@@ -4285,7 +4304,7 @@ class YoutubeDL:
return True
def _write_subtitles(self, info_dict, filename):
- ''' Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error'''
+ """ Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error"""
ret = []
subtitles = info_dict.get('requested_subtitles')
if not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
@@ -4331,7 +4350,7 @@ class YoutubeDL:
self.dl(sub_filename, sub_copy, subtitle=True)
sub_info['filepath'] = sub_filename
ret.append((sub_filename, sub_filename_final))
- except (DownloadError, ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
+ except (DownloadError, ExtractorError, OSError, ValueError, *network_exceptions) as err:
msg = f'Unable to download video subtitles for {sub_lang!r}: {err}'
if self.params.get('ignoreerrors') is not True: # False or 'only_download'
if not self.params.get('ignoreerrors'):
@@ -4341,7 +4360,7 @@ class YoutubeDL:
return ret
def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):
- ''' Write thumbnails to file and return list of (thumb_filename, final_thumb_filename); or None if error '''
+ """ Write thumbnails to file and return list of (thumb_filename, final_thumb_filename); or None if error """
write_all = self.params.get('write_all_thumbnails', False)
thumbnails, ret = [], []
if write_all or self.params.get('writethumbnail', False):
@@ -4368,8 +4387,8 @@ class YoutubeDL:
existing_thumb = self.existing_file((thumb_filename_final, thumb_filename))
if existing_thumb:
- self.to_screen('[info] %s is already present' % (
- thumb_display_id if multiple else f'{label} thumbnail').capitalize())
+ self.to_screen('[info] {} is already present'.format((
+ thumb_display_id if multiple else f'{label} thumbnail').capitalize()))
t['filepath'] = existing_thumb
ret.append((existing_thumb, thumb_filename_final))
else: