diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:49:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:49:24 +0000 |
commit | 2415e66f889f38503b73e8ebc5f43ca342390e5c (patch) | |
tree | ac48ab69d1d96bae3d83756134921e0d90593aa5 /yt_dlp/downloader | |
parent | Initial commit. (diff) | |
download | yt-dlp-2415e66f889f38503b73e8ebc5f43ca342390e5c.tar.xz yt-dlp-2415e66f889f38503b73e8ebc5f43ca342390e5c.zip |
Adding upstream version 2024.03.10.upstream/2024.03.10
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'yt_dlp/downloader')
-rw-r--r-- | yt_dlp/downloader/__init__.py | 131 | ||||
-rw-r--r-- | yt_dlp/downloader/common.py | 486 | ||||
-rw-r--r-- | yt_dlp/downloader/dash.py | 90 | ||||
-rw-r--r-- | yt_dlp/downloader/external.py | 664 | ||||
-rw-r--r-- | yt_dlp/downloader/f4m.py | 427 | ||||
-rw-r--r-- | yt_dlp/downloader/fc2.py | 46 | ||||
-rw-r--r-- | yt_dlp/downloader/fragment.py | 527 | ||||
-rw-r--r-- | yt_dlp/downloader/hls.py | 378 | ||||
-rw-r--r-- | yt_dlp/downloader/http.py | 383 | ||||
-rw-r--r-- | yt_dlp/downloader/ism.py | 283 | ||||
-rw-r--r-- | yt_dlp/downloader/mhtml.py | 189 | ||||
-rw-r--r-- | yt_dlp/downloader/niconico.py | 140 | ||||
-rw-r--r-- | yt_dlp/downloader/rtmp.py | 213 | ||||
-rw-r--r-- | yt_dlp/downloader/rtsp.py | 42 | ||||
-rw-r--r-- | yt_dlp/downloader/websocket.py | 53 | ||||
-rw-r--r-- | yt_dlp/downloader/youtube_live_chat.py | 228 |
16 files changed, 4280 insertions, 0 deletions
diff --git a/yt_dlp/downloader/__init__.py b/yt_dlp/downloader/__init__.py new file mode 100644 index 0000000..51a9f28 --- /dev/null +++ b/yt_dlp/downloader/__init__.py @@ -0,0 +1,131 @@ +from ..utils import NO_DEFAULT, determine_protocol + + +def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=None, to_stdout=False): + info_dict['protocol'] = determine_protocol(info_dict) + info_copy = info_dict.copy() + info_copy['to_stdout'] = to_stdout + + protocols = (protocol or info_copy['protocol']).split('+') + downloaders = [_get_suitable_downloader(info_copy, proto, params, default) for proto in protocols] + + if set(downloaders) == {FFmpegFD} and FFmpegFD.can_merge_formats(info_copy, params): + return FFmpegFD + elif (set(downloaders) == {DashSegmentsFD} + and not (to_stdout and len(protocols) > 1) + and set(protocols) == {'http_dash_segments_generator'}): + return DashSegmentsFD + elif len(downloaders) == 1: + return downloaders[0] + return None + + +# Some of these require get_suitable_downloader +from .common import FileDownloader +from .dash import DashSegmentsFD +from .external import FFmpegFD, get_external_downloader +from .f4m import F4mFD +from .fc2 import FC2LiveFD +from .hls import HlsFD +from .http import HttpFD +from .ism import IsmFD +from .mhtml import MhtmlFD +from .niconico import NiconicoDmcFD, NiconicoLiveFD +from .rtmp import RtmpFD +from .rtsp import RtspFD +from .websocket import WebSocketFragmentFD +from .youtube_live_chat import YoutubeLiveChatFD + +PROTOCOL_MAP = { + 'rtmp': RtmpFD, + 'rtmpe': RtmpFD, + 'rtmp_ffmpeg': FFmpegFD, + 'm3u8_native': HlsFD, + 'm3u8': FFmpegFD, + 'mms': RtspFD, + 'rtsp': RtspFD, + 'f4m': F4mFD, + 'http_dash_segments': DashSegmentsFD, + 'http_dash_segments_generator': DashSegmentsFD, + 'ism': IsmFD, + 'mhtml': MhtmlFD, + 'niconico_dmc': NiconicoDmcFD, + 'niconico_live': NiconicoLiveFD, + 'fc2_live': FC2LiveFD, + 'websocket_frag': WebSocketFragmentFD, + 'youtube_live_chat': YoutubeLiveChatFD, + 'youtube_live_chat_replay': YoutubeLiveChatFD, +} + + +def shorten_protocol_name(proto, simplify=False): + short_protocol_names = { + 'm3u8_native': 'm3u8', + 'm3u8': 'm3u8F', + 'rtmp_ffmpeg': 'rtmpF', + 'http_dash_segments': 'dash', + 'http_dash_segments_generator': 'dashG', + 'niconico_dmc': 'dmc', + 'websocket_frag': 'WSfrag', + } + if simplify: + short_protocol_names.update({ + 'https': 'http', + 'ftps': 'ftp', + 'm3u8': 'm3u8', # Reverse above m3u8 mapping + 'm3u8_native': 'm3u8', + 'http_dash_segments_generator': 'dash', + 'rtmp_ffmpeg': 'rtmp', + 'm3u8_frag_urls': 'm3u8', + 'dash_frag_urls': 'dash', + }) + return short_protocol_names.get(proto, proto) + + +def _get_suitable_downloader(info_dict, protocol, params, default): + """Get the downloader class that can handle the info dict.""" + if default is NO_DEFAULT: + default = HttpFD + + if (info_dict.get('section_start') or info_dict.get('section_end')) and FFmpegFD.can_download(info_dict): + return FFmpegFD + + info_dict['protocol'] = protocol + downloaders = params.get('external_downloader') + external_downloader = ( + downloaders if isinstance(downloaders, str) or downloaders is None + else downloaders.get(shorten_protocol_name(protocol, True), downloaders.get('default'))) + + if external_downloader is None: + if info_dict['to_stdout'] and FFmpegFD.can_merge_formats(info_dict, params): + return FFmpegFD + elif external_downloader.lower() != 'native': + ed = get_external_downloader(external_downloader) + if ed.can_download(info_dict, external_downloader): + return ed + + if protocol == 'http_dash_segments': + if info_dict.get('is_live') and (external_downloader or '').lower() != 'native': + return FFmpegFD + + if protocol in ('m3u8', 'm3u8_native'): + if info_dict.get('is_live'): + return FFmpegFD + elif (external_downloader or '').lower() == 'native': + return HlsFD + elif protocol == 'm3u8_native' and get_suitable_downloader( + info_dict, params, None, protocol='m3u8_frag_urls', to_stdout=info_dict['to_stdout']): + return HlsFD + elif params.get('hls_prefer_native') is True: + return HlsFD + elif params.get('hls_prefer_native') is False: + return FFmpegFD + + return PROTOCOL_MAP.get(protocol, default) + + +__all__ = [ + 'FileDownloader', + 'get_suitable_downloader', + 'shorten_protocol_name', +] diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py new file mode 100644 index 0000000..b71d7ee --- /dev/null +++ b/yt_dlp/downloader/common.py @@ -0,0 +1,486 @@ +import contextlib +import errno +import functools +import os +import random +import re +import time + +from ..minicurses import ( + BreaklineStatusPrinter, + MultilineLogger, + MultilinePrinter, + QuietMultilinePrinter, +) +from ..utils import ( + IDENTITY, + NO_DEFAULT, + LockingUnsupportedError, + Namespace, + RetryManager, + classproperty, + decodeArgument, + deprecation_warning, + encodeFilename, + format_bytes, + join_nonempty, + parse_bytes, + remove_start, + sanitize_open, + shell_quote, + timeconvert, + timetuple_from_msec, + try_call, +) + + +class FileDownloader: + """File Downloader class. + + File downloader objects are the ones responsible of downloading the + actual video file and writing it to disk. + + File downloaders accept a lot of parameters. In order not to saturate + the object constructor with arguments, it receives a dictionary of + options instead. + + Available options: + + verbose: Print additional info to stdout. + quiet: Do not print messages to stdout. + ratelimit: Download speed limit, in bytes/sec. + throttledratelimit: Assume the download is being throttled below this speed (bytes/sec) + retries: Number of times to retry for expected network errors. + Default is 0 for API, but 10 for CLI + file_access_retries: Number of times to retry on file access error (default: 3) + buffersize: Size of download buffer in bytes. + noresizebuffer: Do not automatically resize the download buffer. + continuedl: Try to continue downloads if possible. + noprogress: Do not print the progress bar. + nopart: Do not use temporary .part files. + updatetime: Use the Last-modified header to set output file timestamps. + test: Download only first bytes to test the downloader. + min_filesize: Skip files smaller than this size + max_filesize: Skip files larger than this size + xattr_set_filesize: Set ytdl.filesize user xattribute with expected size. + external_downloader_args: A dictionary of downloader keys (in lower case) + and a list of additional command-line arguments for the + executable. Use 'default' as the name for arguments to be + passed to all downloaders. For compatibility with youtube-dl, + a single list of args can also be used + hls_use_mpegts: Use the mpegts container for HLS videos. + http_chunk_size: Size of a chunk for chunk-based HTTP downloading. May be + useful for bypassing bandwidth throttling imposed by + a webserver (experimental) + progress_template: See YoutubeDL.py + retry_sleep_functions: See YoutubeDL.py + + Subclasses of this one must re-define the real_download method. + """ + + _TEST_FILE_SIZE = 10241 + params = None + + def __init__(self, ydl, params): + """Create a FileDownloader object with the given options.""" + self._set_ydl(ydl) + self._progress_hooks = [] + self.params = params + self._prepare_multiline_status() + self.add_progress_hook(self.report_progress) + + def _set_ydl(self, ydl): + self.ydl = ydl + + for func in ( + 'deprecation_warning', + 'deprecated_feature', + 'report_error', + 'report_file_already_downloaded', + 'report_warning', + 'to_console_title', + 'to_stderr', + 'trouble', + 'write_debug', + ): + if not hasattr(self, func): + setattr(self, func, getattr(ydl, func)) + + def to_screen(self, *args, **kargs): + self.ydl.to_screen(*args, quiet=self.params.get('quiet'), **kargs) + + __to_screen = to_screen + + @classproperty + def FD_NAME(cls): + return re.sub(r'(?<=[a-z])(?=[A-Z])', '_', cls.__name__[:-2]).lower() + + @staticmethod + def format_seconds(seconds): + if seconds is None: + return ' Unknown' + time = timetuple_from_msec(seconds * 1000) + if time.hours > 99: + return '--:--:--' + return '%02d:%02d:%02d' % time[:-1] + + @classmethod + def format_eta(cls, seconds): + return f'{remove_start(cls.format_seconds(seconds), "00:"):>8s}' + + @staticmethod + def calc_percent(byte_counter, data_len): + if data_len is None: + return None + return float(byte_counter) / float(data_len) * 100.0 + + @staticmethod + def format_percent(percent): + return ' N/A%' if percent is None else f'{percent:>5.1f}%' + + @classmethod + def calc_eta(cls, start_or_rate, now_or_remaining, total=NO_DEFAULT, current=NO_DEFAULT): + if total is NO_DEFAULT: + rate, remaining = start_or_rate, now_or_remaining + if None in (rate, remaining): + return None + return int(float(remaining) / rate) + + start, now = start_or_rate, now_or_remaining + if total is None: + return None + if now is None: + now = time.time() + rate = cls.calc_speed(start, now, current) + return rate and int((float(total) - float(current)) / rate) + + @staticmethod + def calc_speed(start, now, bytes): + dif = now - start + if bytes == 0 or dif < 0.001: # One millisecond + return None + return float(bytes) / dif + + @staticmethod + def format_speed(speed): + return ' Unknown B/s' if speed is None else f'{format_bytes(speed):>10s}/s' + + @staticmethod + def format_retries(retries): + return 'inf' if retries == float('inf') else int(retries) + + @staticmethod + def filesize_or_none(unencoded_filename): + if os.path.isfile(unencoded_filename): + return os.path.getsize(unencoded_filename) + return 0 + + @staticmethod + def best_block_size(elapsed_time, bytes): + new_min = max(bytes / 2.0, 1.0) + new_max = min(max(bytes * 2.0, 1.0), 4194304) # Do not surpass 4 MB + if elapsed_time < 0.001: + return int(new_max) + rate = bytes / elapsed_time + if rate > new_max: + return int(new_max) + if rate < new_min: + return int(new_min) + return int(rate) + + @staticmethod + def parse_bytes(bytestr): + """Parse a string indicating a byte quantity into an integer.""" + deprecation_warning('yt_dlp.FileDownloader.parse_bytes is deprecated and ' + 'may be removed in the future. Use yt_dlp.utils.parse_bytes instead') + return parse_bytes(bytestr) + + def slow_down(self, start_time, now, byte_counter): + """Sleep if the download speed is over the rate limit.""" + rate_limit = self.params.get('ratelimit') + if rate_limit is None or byte_counter == 0: + return + if now is None: + now = time.time() + elapsed = now - start_time + if elapsed <= 0.0: + return + speed = float(byte_counter) / elapsed + if speed > rate_limit: + sleep_time = float(byte_counter) / rate_limit - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + + def temp_name(self, filename): + """Returns a temporary filename for the given filename.""" + if self.params.get('nopart', False) or filename == '-' or \ + (os.path.exists(encodeFilename(filename)) and not os.path.isfile(encodeFilename(filename))): + return filename + return filename + '.part' + + def undo_temp_name(self, filename): + if filename.endswith('.part'): + return filename[:-len('.part')] + return filename + + def ytdl_filename(self, filename): + return filename + '.ytdl' + + def wrap_file_access(action, *, fatal=False): + def error_callback(err, count, retries, *, fd): + return RetryManager.report_retry( + err, count, retries, info=fd.__to_screen, + warn=lambda e: (time.sleep(0.01), fd.to_screen(f'[download] Unable to {action} file: {e}')), + error=None if fatal else lambda e: fd.report_error(f'Unable to {action} file: {e}'), + sleep_func=fd.params.get('retry_sleep_functions', {}).get('file_access')) + + def wrapper(self, func, *args, **kwargs): + for retry in RetryManager(self.params.get('file_access_retries', 3), error_callback, fd=self): + try: + return func(self, *args, **kwargs) + except OSError as err: + if err.errno in (errno.EACCES, errno.EINVAL): + retry.error = err + continue + retry.error_callback(err, 1, 0) + + return functools.partial(functools.partialmethod, wrapper) + + @wrap_file_access('open', fatal=True) + def sanitize_open(self, filename, open_mode): + f, filename = sanitize_open(filename, open_mode) + if not getattr(f, 'locked', None): + self.write_debug(f'{LockingUnsupportedError.msg}. Proceeding without locking', only_once=True) + return f, filename + + @wrap_file_access('remove') + def try_remove(self, filename): + if os.path.isfile(filename): + os.remove(filename) + + @wrap_file_access('rename') + def try_rename(self, old_filename, new_filename): + if old_filename == new_filename: + return + os.replace(old_filename, new_filename) + + def try_utime(self, filename, last_modified_hdr): + """Try to set the last-modified time of the given file.""" + if last_modified_hdr is None: + return + if not os.path.isfile(encodeFilename(filename)): + return + timestr = last_modified_hdr + if timestr is None: + return + filetime = timeconvert(timestr) + if filetime is None: + return filetime + # Ignore obviously invalid dates + if filetime == 0: + return + with contextlib.suppress(Exception): + os.utime(filename, (time.time(), filetime)) + return filetime + + def report_destination(self, filename): + """Report destination filename.""" + self.to_screen('[download] Destination: ' + filename) + + def _prepare_multiline_status(self, lines=1): + if self.params.get('noprogress'): + self._multiline = QuietMultilinePrinter() + elif self.ydl.params.get('logger'): + self._multiline = MultilineLogger(self.ydl.params['logger'], lines) + elif self.params.get('progress_with_newline'): + self._multiline = BreaklineStatusPrinter(self.ydl._out_files.out, lines) + else: + self._multiline = MultilinePrinter(self.ydl._out_files.out, lines, not self.params.get('quiet')) + self._multiline.allow_colors = self.ydl._allow_colors.out and self.ydl._allow_colors.out != 'no_color' + self._multiline._HAVE_FULLCAP = self.ydl._allow_colors.out + + def _finish_multiline_status(self): + self._multiline.end() + + ProgressStyles = Namespace( + downloaded_bytes='light blue', + percent='light blue', + eta='yellow', + speed='green', + elapsed='bold white', + total_bytes='', + total_bytes_estimate='', + ) + + def _report_progress_status(self, s, default_template): + for name, style in self.ProgressStyles.items_: + name = f'_{name}_str' + if name not in s: + continue + s[name] = self._format_progress(s[name], style) + s['_default_template'] = default_template % s + + progress_dict = s.copy() + progress_dict.pop('info_dict') + progress_dict = {'info': s['info_dict'], 'progress': progress_dict} + + progress_template = self.params.get('progress_template', {}) + self._multiline.print_at_line(self.ydl.evaluate_outtmpl( + progress_template.get('download') or '[download] %(progress._default_template)s', + progress_dict), s.get('progress_idx') or 0) + self.to_console_title(self.ydl.evaluate_outtmpl( + progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s', + progress_dict)) + + def _format_progress(self, *args, **kwargs): + return self.ydl._format_text( + self._multiline.stream, self._multiline.allow_colors, *args, **kwargs) + + def report_progress(self, s): + def with_fields(*tups, default=''): + for *fields, tmpl in tups: + if all(s.get(f) is not None for f in fields): + return tmpl + return default + + _format_bytes = lambda k: f'{format_bytes(s.get(k)):>10s}' + + if s['status'] == 'finished': + if self.params.get('noprogress'): + self.to_screen('[download] Download completed') + speed = try_call(lambda: s['total_bytes'] / s['elapsed']) + s.update({ + 'speed': speed, + '_speed_str': self.format_speed(speed).strip(), + '_total_bytes_str': _format_bytes('total_bytes'), + '_elapsed_str': self.format_seconds(s.get('elapsed')), + '_percent_str': self.format_percent(100), + }) + self._report_progress_status(s, join_nonempty( + '100%%', + with_fields(('total_bytes', 'of %(_total_bytes_str)s')), + with_fields(('elapsed', 'in %(_elapsed_str)s')), + with_fields(('speed', 'at %(_speed_str)s')), + delim=' ')) + + if s['status'] != 'downloading': + return + + s.update({ + '_eta_str': self.format_eta(s.get('eta')).strip(), + '_speed_str': self.format_speed(s.get('speed')), + '_percent_str': self.format_percent(try_call( + lambda: 100 * s['downloaded_bytes'] / s['total_bytes'], + lambda: 100 * s['downloaded_bytes'] / s['total_bytes_estimate'], + lambda: s['downloaded_bytes'] == 0 and 0)), + '_total_bytes_str': _format_bytes('total_bytes'), + '_total_bytes_estimate_str': _format_bytes('total_bytes_estimate'), + '_downloaded_bytes_str': _format_bytes('downloaded_bytes'), + '_elapsed_str': self.format_seconds(s.get('elapsed')), + }) + + msg_template = with_fields( + ('total_bytes', '%(_percent_str)s of %(_total_bytes_str)s at %(_speed_str)s ETA %(_eta_str)s'), + ('total_bytes_estimate', '%(_percent_str)s of ~%(_total_bytes_estimate_str)s at %(_speed_str)s ETA %(_eta_str)s'), + ('downloaded_bytes', 'elapsed', '%(_downloaded_bytes_str)s at %(_speed_str)s (%(_elapsed_str)s)'), + ('downloaded_bytes', '%(_downloaded_bytes_str)s at %(_speed_str)s'), + default='%(_percent_str)s at %(_speed_str)s ETA %(_eta_str)s') + + msg_template += with_fields( + ('fragment_index', 'fragment_count', ' (frag %(fragment_index)s/%(fragment_count)s)'), + ('fragment_index', ' (frag %(fragment_index)s)')) + self._report_progress_status(s, msg_template) + + def report_resuming_byte(self, resume_len): + """Report attempt to resume at given byte.""" + self.to_screen('[download] Resuming download at byte %s' % resume_len) + + def report_retry(self, err, count, retries, frag_index=NO_DEFAULT, fatal=True): + """Report retry""" + is_frag = False if frag_index is NO_DEFAULT else 'fragment' + RetryManager.report_retry( + err, count, retries, info=self.__to_screen, + warn=lambda msg: self.__to_screen(f'[download] Got error: {msg}'), + error=IDENTITY if not fatal else lambda e: self.report_error(f'\r[download] Got error: {e}'), + sleep_func=self.params.get('retry_sleep_functions', {}).get(is_frag or 'http'), + suffix=f'fragment{"s" if frag_index is None else f" {frag_index}"}' if is_frag else None) + + def report_unable_to_resume(self): + """Report it was impossible to resume download.""" + self.to_screen('[download] Unable to resume') + + @staticmethod + def supports_manifest(manifest): + """ Whether the downloader can download the fragments from the manifest. + Redefine in subclasses if needed. """ + pass + + def download(self, filename, info_dict, subtitle=False): + """Download to a filename using the info from info_dict + Return True on success and False otherwise + """ + nooverwrites_and_exists = ( + not self.params.get('overwrites', True) + and os.path.exists(encodeFilename(filename)) + ) + + if not hasattr(filename, 'write'): + continuedl_and_exists = ( + self.params.get('continuedl', True) + and os.path.isfile(encodeFilename(filename)) + and not self.params.get('nopart', False) + ) + + # Check file already present + if filename != '-' and (nooverwrites_and_exists or continuedl_and_exists): + self.report_file_already_downloaded(filename) + self._hook_progress({ + 'filename': filename, + 'status': 'finished', + 'total_bytes': os.path.getsize(encodeFilename(filename)), + }, info_dict) + self._finish_multiline_status() + return True, False + + if subtitle: + sleep_interval = self.params.get('sleep_interval_subtitles') or 0 + else: + min_sleep_interval = self.params.get('sleep_interval') or 0 + sleep_interval = random.uniform( + min_sleep_interval, self.params.get('max_sleep_interval') or min_sleep_interval) + if sleep_interval > 0: + self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...') + time.sleep(sleep_interval) + + ret = self.real_download(filename, info_dict) + self._finish_multiline_status() + return ret, True + + def real_download(self, filename, info_dict): + """Real download process. Redefine in subclasses.""" + raise NotImplementedError('This method must be implemented by subclasses') + + def _hook_progress(self, status, info_dict): + # Ideally we want to make a copy of the dict, but that is too slow + status['info_dict'] = info_dict + # youtube-dl passes the same status object to all the hooks. + # Some third party scripts seems to be relying on this. + # So keep this behavior if possible + for ph in self._progress_hooks: + ph(status) + + def add_progress_hook(self, ph): + # See YoutubeDl.py (search for progress_hooks) for a description of + # this interface + self._progress_hooks.append(ph) + + def _debug_cmd(self, args, exe=None): + if not self.params.get('verbose', False): + return + + str_args = [decodeArgument(a) for a in args] + + if exe is None: + exe = os.path.basename(str_args[0]) + + self.write_debug(f'{exe} command line: {shell_quote(str_args)}') diff --git a/yt_dlp/downloader/dash.py b/yt_dlp/downloader/dash.py new file mode 100644 index 0000000..afc79b6 --- /dev/null +++ b/yt_dlp/downloader/dash.py @@ -0,0 +1,90 @@ +import time +import urllib.parse + +from . import get_suitable_downloader +from .fragment import FragmentFD +from ..utils import update_url_query, urljoin + + +class DashSegmentsFD(FragmentFD): + """ + Download segments in a DASH manifest. External downloaders can take over + the fragment downloads by supporting the 'dash_frag_urls' protocol + """ + + FD_NAME = 'dashsegments' + + def real_download(self, filename, info_dict): + if 'http_dash_segments_generator' in info_dict['protocol'].split('+'): + real_downloader = None # No external FD can support --live-from-start + else: + if info_dict.get('is_live'): + self.report_error('Live DASH videos are not supported') + real_downloader = get_suitable_downloader( + info_dict, self.params, None, protocol='dash_frag_urls', to_stdout=(filename == '-')) + + real_start = time.time() + + requested_formats = [{**info_dict, **fmt} for fmt in info_dict.get('requested_formats', [])] + args = [] + for fmt in requested_formats or [info_dict]: + try: + fragment_count = 1 if self.params.get('test') else len(fmt['fragments']) + except TypeError: + fragment_count = None + ctx = { + 'filename': fmt.get('filepath') or filename, + 'live': 'is_from_start' if fmt.get('is_from_start') else fmt.get('is_live'), + 'total_frags': fragment_count, + } + + if real_downloader: + self._prepare_external_frag_download(ctx) + else: + self._prepare_and_start_frag_download(ctx, fmt) + ctx['start'] = real_start + + extra_query = None + extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url') + if extra_param_to_segment_url: + extra_query = urllib.parse.parse_qs(extra_param_to_segment_url) + + fragments_to_download = self._get_fragments(fmt, ctx, extra_query) + + if real_downloader: + self.to_screen( + f'[{self.FD_NAME}] Fragment downloads will be delegated to {real_downloader.get_basename()}') + info_dict['fragments'] = list(fragments_to_download) + fd = real_downloader(self.ydl, self.params) + return fd.real_download(filename, info_dict) + + args.append([ctx, fragments_to_download, fmt]) + + return self.download_and_append_fragments_multiple(*args, is_fatal=lambda idx: idx == 0) + + def _resolve_fragments(self, fragments, ctx): + fragments = fragments(ctx) if callable(fragments) else fragments + return [next(iter(fragments))] if self.params.get('test') else fragments + + def _get_fragments(self, fmt, ctx, extra_query): + fragment_base_url = fmt.get('fragment_base_url') + fragments = self._resolve_fragments(fmt['fragments'], ctx) + + frag_index = 0 + for i, fragment in enumerate(fragments): + frag_index += 1 + if frag_index <= ctx['fragment_index']: + continue + fragment_url = fragment.get('url') + if not fragment_url: + assert fragment_base_url + fragment_url = urljoin(fragment_base_url, fragment['path']) + if extra_query: + fragment_url = update_url_query(fragment_url, extra_query) + + yield { + 'frag_index': frag_index, + 'fragment_count': fragment.get('fragment_count'), + 'index': i, + 'url': fragment_url, + } diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py new file mode 100644 index 0000000..ce5eeb0 --- /dev/null +++ b/yt_dlp/downloader/external.py @@ -0,0 +1,664 @@ +import enum +import json +import os +import re +import subprocess +import sys +import tempfile +import time +import uuid + +from .fragment import FragmentFD +from ..compat import functools +from ..networking import Request +from ..postprocessor.ffmpeg import EXT_TO_OUT_FORMATS, FFmpegPostProcessor +from ..utils import ( + Popen, + RetryManager, + _configuration_args, + check_executable, + classproperty, + cli_bool_option, + cli_option, + cli_valueless_option, + determine_ext, + encodeArgument, + encodeFilename, + find_available_port, + remove_end, + traverse_obj, +) + + +class Features(enum.Enum): + TO_STDOUT = enum.auto() + MULTIPLE_FORMATS = enum.auto() + + +class ExternalFD(FragmentFD): + SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps') + SUPPORTED_FEATURES = () + _CAPTURE_STDERR = True + + def real_download(self, filename, info_dict): + self.report_destination(filename) + tmpfilename = self.temp_name(filename) + self._cookies_tempfile = None + + try: + started = time.time() + retval = self._call_downloader(tmpfilename, info_dict) + except KeyboardInterrupt: + if not info_dict.get('is_live'): + raise + # Live stream downloading cancellation should be considered as + # correct and expected termination thus all postprocessing + # should take place + retval = 0 + self.to_screen('[%s] Interrupted by user' % self.get_basename()) + finally: + if self._cookies_tempfile: + self.try_remove(self._cookies_tempfile) + + if retval == 0: + status = { + 'filename': filename, + 'status': 'finished', + 'elapsed': time.time() - started, + } + if filename != '-': + fsize = os.path.getsize(encodeFilename(tmpfilename)) + self.try_rename(tmpfilename, filename) + status.update({ + 'downloaded_bytes': fsize, + 'total_bytes': fsize, + }) + self._hook_progress(status, info_dict) + return True + else: + self.to_stderr('\n') + self.report_error('%s exited with code %d' % ( + self.get_basename(), retval)) + return False + + @classmethod + def get_basename(cls): + return cls.__name__[:-2].lower() + + @classproperty + def EXE_NAME(cls): + return cls.get_basename() + + @functools.cached_property + def exe(self): + return self.EXE_NAME + + @classmethod + def available(cls, path=None): + path = check_executable( + cls.EXE_NAME if path in (None, cls.get_basename()) else path, + [cls.AVAILABLE_OPT]) + if not path: + return False + cls.exe = path + return path + + @classmethod + def supports(cls, info_dict): + return all(( + not info_dict.get('to_stdout') or Features.TO_STDOUT in cls.SUPPORTED_FEATURES, + '+' not in info_dict['protocol'] or Features.MULTIPLE_FORMATS in cls.SUPPORTED_FEATURES, + not traverse_obj(info_dict, ('hls_aes', ...), 'extra_param_to_segment_url'), + all(proto in cls.SUPPORTED_PROTOCOLS for proto in info_dict['protocol'].split('+')), + )) + + @classmethod + def can_download(cls, info_dict, path=None): + return cls.available(path) and cls.supports(info_dict) + + def _option(self, command_option, param): + return cli_option(self.params, command_option, param) + + def _bool_option(self, command_option, param, true_value='true', false_value='false', separator=None): + return cli_bool_option(self.params, command_option, param, true_value, false_value, separator) + + def _valueless_option(self, command_option, param, expected_value=True): + return cli_valueless_option(self.params, command_option, param, expected_value) + + def _configuration_args(self, keys=None, *args, **kwargs): + return _configuration_args( + self.get_basename(), self.params.get('external_downloader_args'), self.EXE_NAME, + keys, *args, **kwargs) + + def _write_cookies(self): + if not self.ydl.cookiejar.filename: + tmp_cookies = tempfile.NamedTemporaryFile(suffix='.cookies', delete=False) + tmp_cookies.close() + self._cookies_tempfile = tmp_cookies.name + self.to_screen(f'[download] Writing temporary cookies file to "{self._cookies_tempfile}"') + # real_download resets _cookies_tempfile; if it's None then save() will write to cookiejar.filename + self.ydl.cookiejar.save(self._cookies_tempfile) + return self.ydl.cookiejar.filename or self._cookies_tempfile + + def _call_downloader(self, tmpfilename, info_dict): + """ Either overwrite this or implement _make_cmd """ + cmd = [encodeArgument(a) for a in self._make_cmd(tmpfilename, info_dict)] + + self._debug_cmd(cmd) + + if 'fragments' not in info_dict: + _, stderr, returncode = self._call_process(cmd, info_dict) + if returncode and stderr: + self.to_stderr(stderr) + return returncode + + skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) + + retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry, + frag_index=None, fatal=not skip_unavailable_fragments) + for retry in retry_manager: + _, stderr, returncode = self._call_process(cmd, info_dict) + if not returncode: + break + # TODO: Decide whether to retry based on error code + # https://aria2.github.io/manual/en/html/aria2c.html#exit-status + if stderr: + self.to_stderr(stderr) + retry.error = Exception() + continue + if not skip_unavailable_fragments and retry_manager.error: + return -1 + + decrypt_fragment = self.decrypter(info_dict) + dest, _ = self.sanitize_open(tmpfilename, 'wb') + for frag_index, fragment in enumerate(info_dict['fragments']): + fragment_filename = '%s-Frag%d' % (tmpfilename, frag_index) + try: + src, _ = self.sanitize_open(fragment_filename, 'rb') + except OSError as err: + if skip_unavailable_fragments and frag_index > 1: + self.report_skip_fragment(frag_index, err) + continue + self.report_error(f'Unable to open fragment {frag_index}; {err}') + return -1 + dest.write(decrypt_fragment(fragment, src.read())) + src.close() + if not self.params.get('keep_fragments', False): + self.try_remove(encodeFilename(fragment_filename)) + dest.close() + self.try_remove(encodeFilename('%s.frag.urls' % tmpfilename)) + return 0 + + def _call_process(self, cmd, info_dict): + return Popen.run(cmd, text=True, stderr=subprocess.PIPE if self._CAPTURE_STDERR else None) + + +class CurlFD(ExternalFD): + AVAILABLE_OPT = '-V' + _CAPTURE_STDERR = False # curl writes the progress to stderr + + def _make_cmd(self, tmpfilename, info_dict): + cmd = [self.exe, '--location', '-o', tmpfilename, '--compressed'] + cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url']) + if cookie_header: + cmd += ['--cookie', cookie_header] + if info_dict.get('http_headers') is not None: + for key, val in info_dict['http_headers'].items(): + cmd += ['--header', f'{key}: {val}'] + + cmd += self._bool_option('--continue-at', 'continuedl', '-', '0') + cmd += self._valueless_option('--silent', 'noprogress') + cmd += self._valueless_option('--verbose', 'verbose') + cmd += self._option('--limit-rate', 'ratelimit') + retry = self._option('--retry', 'retries') + if len(retry) == 2: + if retry[1] in ('inf', 'infinite'): + retry[1] = '2147483647' + cmd += retry + cmd += self._option('--max-filesize', 'max_filesize') + cmd += self._option('--interface', 'source_address') + cmd += self._option('--proxy', 'proxy') + cmd += self._valueless_option('--insecure', 'nocheckcertificate') + cmd += self._configuration_args() + cmd += ['--', info_dict['url']] + return cmd + + +class AxelFD(ExternalFD): + AVAILABLE_OPT = '-V' + + def _make_cmd(self, tmpfilename, info_dict): + cmd = [self.exe, '-o', tmpfilename] + if info_dict.get('http_headers') is not None: + for key, val in info_dict['http_headers'].items(): + cmd += ['-H', f'{key}: {val}'] + cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url']) + if cookie_header: + cmd += ['-H', f'Cookie: {cookie_header}', '--max-redirect=0'] + cmd += self._configuration_args() + cmd += ['--', info_dict['url']] + return cmd + + +class WgetFD(ExternalFD): + AVAILABLE_OPT = '--version' + + def _make_cmd(self, tmpfilename, info_dict): + cmd = [self.exe, '-O', tmpfilename, '-nv', '--compression=auto'] + if self.ydl.cookiejar.get_cookie_header(info_dict['url']): + cmd += ['--load-cookies', self._write_cookies()] + if info_dict.get('http_headers') is not None: + for key, val in info_dict['http_headers'].items(): + cmd += ['--header', f'{key}: {val}'] + cmd += self._option('--limit-rate', 'ratelimit') + retry = self._option('--tries', 'retries') + if len(retry) == 2: + if retry[1] in ('inf', 'infinite'): + retry[1] = '0' + cmd += retry + cmd += self._option('--bind-address', 'source_address') + proxy = self.params.get('proxy') + if proxy: + for var in ('http_proxy', 'https_proxy'): + cmd += ['--execute', f'{var}={proxy}'] + cmd += self._valueless_option('--no-check-certificate', 'nocheckcertificate') + cmd += self._configuration_args() + cmd += ['--', info_dict['url']] + return cmd + + +class Aria2cFD(ExternalFD): + AVAILABLE_OPT = '-v' + SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'dash_frag_urls', 'm3u8_frag_urls') + + @staticmethod + def supports_manifest(manifest): + UNSUPPORTED_FEATURES = [ + r'#EXT-X-BYTERANGE', # playlists composed of byte ranges of media files [1] + # 1. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.2 + ] + check_results = (not re.search(feature, manifest) for feature in UNSUPPORTED_FEATURES) + return all(check_results) + + @staticmethod + def _aria2c_filename(fn): + return fn if os.path.isabs(fn) else f'.{os.path.sep}{fn}' + + def _call_downloader(self, tmpfilename, info_dict): + # FIXME: Disabled due to https://github.com/yt-dlp/yt-dlp/issues/5931 + if False and 'no-external-downloader-progress' not in self.params.get('compat_opts', []): + info_dict['__rpc'] = { + 'port': find_available_port() or 19190, + 'secret': str(uuid.uuid4()), + } + return super()._call_downloader(tmpfilename, info_dict) + + def _make_cmd(self, tmpfilename, info_dict): + cmd = [self.exe, '-c', '--no-conf', + '--console-log-level=warn', '--summary-interval=0', '--download-result=hide', + '--http-accept-gzip=true', '--file-allocation=none', '-x16', '-j16', '-s16'] + if 'fragments' in info_dict: + cmd += ['--allow-overwrite=true', '--allow-piece-length-change=true'] + else: + cmd += ['--min-split-size', '1M'] + + if self.ydl.cookiejar.get_cookie_header(info_dict['url']): + cmd += [f'--load-cookies={self._write_cookies()}'] + if info_dict.get('http_headers') is not None: + for key, val in info_dict['http_headers'].items(): + cmd += ['--header', f'{key}: {val}'] + cmd += self._option('--max-overall-download-limit', 'ratelimit') + cmd += self._option('--interface', 'source_address') + cmd += self._option('--all-proxy', 'proxy') + cmd += self._bool_option('--check-certificate', 'nocheckcertificate', 'false', 'true', '=') + cmd += self._bool_option('--remote-time', 'updatetime', 'true', 'false', '=') + cmd += self._bool_option('--show-console-readout', 'noprogress', 'false', 'true', '=') + cmd += self._configuration_args() + + if '__rpc' in info_dict: + cmd += [ + '--enable-rpc', + f'--rpc-listen-port={info_dict["__rpc"]["port"]}', + f'--rpc-secret={info_dict["__rpc"]["secret"]}'] + + # aria2c strips out spaces from the beginning/end of filenames and paths. + # We work around this issue by adding a "./" to the beginning of the + # filename and relative path, and adding a "/" at the end of the path. + # See: https://github.com/yt-dlp/yt-dlp/issues/276 + # https://github.com/ytdl-org/youtube-dl/issues/20312 + # https://github.com/aria2/aria2/issues/1373 + dn = os.path.dirname(tmpfilename) + if dn: + cmd += ['--dir', self._aria2c_filename(dn) + os.path.sep] + if 'fragments' not in info_dict: + cmd += ['--out', self._aria2c_filename(os.path.basename(tmpfilename))] + cmd += ['--auto-file-renaming=false'] + + if 'fragments' in info_dict: + cmd += ['--uri-selector=inorder'] + url_list_file = '%s.frag.urls' % tmpfilename + url_list = [] + for frag_index, fragment in enumerate(info_dict['fragments']): + fragment_filename = '%s-Frag%d' % (os.path.basename(tmpfilename), frag_index) + url_list.append('%s\n\tout=%s' % (fragment['url'], self._aria2c_filename(fragment_filename))) + stream, _ = self.sanitize_open(url_list_file, 'wb') + stream.write('\n'.join(url_list).encode()) + stream.close() + cmd += ['-i', self._aria2c_filename(url_list_file)] + else: + cmd += ['--', info_dict['url']] + return cmd + + def aria2c_rpc(self, rpc_port, rpc_secret, method, params=()): + # Does not actually need to be UUID, just unique + sanitycheck = str(uuid.uuid4()) + d = json.dumps({ + 'jsonrpc': '2.0', + 'id': sanitycheck, + 'method': method, + 'params': [f'token:{rpc_secret}', *params], + }).encode('utf-8') + request = Request( + f'http://localhost:{rpc_port}/jsonrpc', + data=d, headers={ + 'Content-Type': 'application/json', + 'Content-Length': f'{len(d)}', + }, proxies={'all': None}) + with self.ydl.urlopen(request) as r: + resp = json.load(r) + assert resp.get('id') == sanitycheck, 'Something went wrong with RPC server' + return resp['result'] + + def _call_process(self, cmd, info_dict): + if '__rpc' not in info_dict: + return super()._call_process(cmd, info_dict) + + send_rpc = functools.partial(self.aria2c_rpc, info_dict['__rpc']['port'], info_dict['__rpc']['secret']) + started = time.time() + + fragmented = 'fragments' in info_dict + frag_count = len(info_dict['fragments']) if fragmented else 1 + status = { + 'filename': info_dict.get('_filename'), + 'status': 'downloading', + 'elapsed': 0, + 'downloaded_bytes': 0, + 'fragment_count': frag_count if fragmented else None, + 'fragment_index': 0 if fragmented else None, + } + self._hook_progress(status, info_dict) + + def get_stat(key, *obj, average=False): + val = tuple(filter(None, map(float, traverse_obj(obj, (..., ..., key))))) or [0] + return sum(val) / (len(val) if average else 1) + + with Popen(cmd, text=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) as p: + # Add a small sleep so that RPC client can receive response, + # or the connection stalls infinitely + time.sleep(0.2) + retval = p.poll() + while retval is None: + # We don't use tellStatus as we won't know the GID without reading stdout + # Ref: https://aria2.github.io/manual/en/html/aria2c.html#aria2.tellActive + active = send_rpc('aria2.tellActive') + completed = send_rpc('aria2.tellStopped', [0, frag_count]) + + downloaded = get_stat('totalLength', completed) + get_stat('completedLength', active) + speed = get_stat('downloadSpeed', active) + total = frag_count * get_stat('totalLength', active, completed, average=True) + if total < downloaded: + total = None + + status.update({ + 'downloaded_bytes': int(downloaded), + 'speed': speed, + 'total_bytes': None if fragmented else total, + 'total_bytes_estimate': total, + 'eta': (total - downloaded) / (speed or 1), + 'fragment_index': min(frag_count, len(completed) + 1) if fragmented else None, + 'elapsed': time.time() - started + }) + self._hook_progress(status, info_dict) + + if not active and len(completed) >= frag_count: + send_rpc('aria2.shutdown') + retval = p.wait() + break + + time.sleep(0.1) + retval = p.poll() + + return '', p.stderr.read(), retval + + +class HttpieFD(ExternalFD): + AVAILABLE_OPT = '--version' + EXE_NAME = 'http' + + def _make_cmd(self, tmpfilename, info_dict): + cmd = ['http', '--download', '--output', tmpfilename, info_dict['url']] + + if info_dict.get('http_headers') is not None: + for key, val in info_dict['http_headers'].items(): + cmd += [f'{key}:{val}'] + + # httpie 3.1.0+ removes the Cookie header on redirect, so this should be safe for now. [1] + # If we ever need cookie handling for redirects, we can export the cookiejar into a session. [2] + # 1: https://github.com/httpie/httpie/security/advisories/GHSA-9w4w-cpc8-h2fq + # 2: https://httpie.io/docs/cli/sessions + cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url']) + if cookie_header: + cmd += [f'Cookie:{cookie_header}'] + return cmd + + +class FFmpegFD(ExternalFD): + SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'm3u8', 'm3u8_native', 'rtsp', 'rtmp', 'rtmp_ffmpeg', 'mms', 'http_dash_segments') + SUPPORTED_FEATURES = (Features.TO_STDOUT, Features.MULTIPLE_FORMATS) + + @classmethod + def available(cls, path=None): + # TODO: Fix path for ffmpeg + # Fixme: This may be wrong when --ffmpeg-location is used + return FFmpegPostProcessor().available + + def on_process_started(self, proc, stdin): + """ Override this in subclasses """ + pass + + @classmethod + def can_merge_formats(cls, info_dict, params): + return ( + info_dict.get('requested_formats') + and info_dict.get('protocol') + and not params.get('allow_unplayable_formats') + and 'no-direct-merge' not in params.get('compat_opts', []) + and cls.can_download(info_dict)) + + def _call_downloader(self, tmpfilename, info_dict): + ffpp = FFmpegPostProcessor(downloader=self) + if not ffpp.available: + self.report_error('m3u8 download detected but ffmpeg could not be found. Please install') + return False + ffpp.check_version() + + args = [ffpp.executable, '-y'] + + for log_level in ('quiet', 'verbose'): + if self.params.get(log_level, False): + args += ['-loglevel', log_level] + break + if not self.params.get('verbose'): + args += ['-hide_banner'] + + args += traverse_obj(info_dict, ('downloader_options', 'ffmpeg_args'), default=[]) + + # These exists only for compatibility. Extractors should use + # info_dict['downloader_options']['ffmpeg_args'] instead + args += info_dict.get('_ffmpeg_args') or [] + seekable = info_dict.get('_seekable') + if seekable is not None: + # setting -seekable prevents ffmpeg from guessing if the server + # supports seeking(by adding the header `Range: bytes=0-`), which + # can cause problems in some cases + # https://github.com/ytdl-org/youtube-dl/issues/11800#issuecomment-275037127 + # http://trac.ffmpeg.org/ticket/6125#comment:10 + args += ['-seekable', '1' if seekable else '0'] + + env = None + proxy = self.params.get('proxy') + if proxy: + if not re.match(r'^[\da-zA-Z]+://', proxy): + proxy = 'http://%s' % proxy + + if proxy.startswith('socks'): + self.report_warning( + '%s does not support SOCKS proxies. Downloading is likely to fail. ' + 'Consider adding --hls-prefer-native to your command.' % self.get_basename()) + + # Since December 2015 ffmpeg supports -http_proxy option (see + # http://git.videolan.org/?p=ffmpeg.git;a=commit;h=b4eb1f29ebddd60c41a2eb39f5af701e38e0d3fd) + # We could switch to the following code if we are able to detect version properly + # args += ['-http_proxy', proxy] + env = os.environ.copy() + env['HTTP_PROXY'] = proxy + env['http_proxy'] = proxy + + protocol = info_dict.get('protocol') + + if protocol == 'rtmp': + player_url = info_dict.get('player_url') + page_url = info_dict.get('page_url') + app = info_dict.get('app') + play_path = info_dict.get('play_path') + tc_url = info_dict.get('tc_url') + flash_version = info_dict.get('flash_version') + live = info_dict.get('rtmp_live', False) + conn = info_dict.get('rtmp_conn') + if player_url is not None: + args += ['-rtmp_swfverify', player_url] + if page_url is not None: + args += ['-rtmp_pageurl', page_url] + if app is not None: + args += ['-rtmp_app', app] + if play_path is not None: + args += ['-rtmp_playpath', play_path] + if tc_url is not None: + args += ['-rtmp_tcurl', tc_url] + if flash_version is not None: + args += ['-rtmp_flashver', flash_version] + if live: + args += ['-rtmp_live', 'live'] + if isinstance(conn, list): + for entry in conn: + args += ['-rtmp_conn', entry] + elif isinstance(conn, str): + args += ['-rtmp_conn', conn] + + start_time, end_time = info_dict.get('section_start') or 0, info_dict.get('section_end') + + selected_formats = info_dict.get('requested_formats') or [info_dict] + for i, fmt in enumerate(selected_formats): + is_http = re.match(r'^https?://', fmt['url']) + cookies = self.ydl.cookiejar.get_cookies_for_url(fmt['url']) if is_http else [] + if cookies: + args.extend(['-cookies', ''.join( + f'{cookie.name}={cookie.value}; path={cookie.path}; domain={cookie.domain};\r\n' + for cookie in cookies)]) + if fmt.get('http_headers') and is_http: + # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv: + # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header. + args.extend(['-headers', ''.join(f'{key}: {val}\r\n' for key, val in fmt['http_headers'].items())]) + + if start_time: + args += ['-ss', str(start_time)] + if end_time: + args += ['-t', str(end_time - start_time)] + + args += self._configuration_args((f'_i{i + 1}', '_i')) + ['-i', fmt['url']] + + if not (start_time or end_time) or not self.params.get('force_keyframes_at_cuts'): + args += ['-c', 'copy'] + + if info_dict.get('requested_formats') or protocol == 'http_dash_segments': + for i, fmt in enumerate(selected_formats): + stream_number = fmt.get('manifest_stream_number', 0) + args.extend(['-map', f'{i}:{stream_number}']) + + if self.params.get('test', False): + args += ['-fs', str(self._TEST_FILE_SIZE)] + + ext = info_dict['ext'] + if protocol in ('m3u8', 'm3u8_native'): + use_mpegts = (tmpfilename == '-') or self.params.get('hls_use_mpegts') + if use_mpegts is None: + use_mpegts = info_dict.get('is_live') + if use_mpegts: + args += ['-f', 'mpegts'] + else: + args += ['-f', 'mp4'] + if (ffpp.basename == 'ffmpeg' and ffpp._features.get('needs_adtstoasc')) and (not info_dict.get('acodec') or info_dict['acodec'].split('.')[0] in ('aac', 'mp4a')): + args += ['-bsf:a', 'aac_adtstoasc'] + elif protocol == 'rtmp': + args += ['-f', 'flv'] + elif ext == 'mp4' and tmpfilename == '-': + args += ['-f', 'mpegts'] + elif ext == 'unknown_video': + ext = determine_ext(remove_end(tmpfilename, '.part')) + if ext == 'unknown_video': + self.report_warning( + 'The video format is unknown and cannot be downloaded by ffmpeg. ' + 'Explicitly set the extension in the filename to attempt download in that format') + else: + self.report_warning(f'The video format is unknown. Trying to download as {ext} according to the filename') + args += ['-f', EXT_TO_OUT_FORMATS.get(ext, ext)] + else: + args += ['-f', EXT_TO_OUT_FORMATS.get(ext, ext)] + + args += self._configuration_args(('_o1', '_o', '')) + + args = [encodeArgument(opt) for opt in args] + args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True)) + self._debug_cmd(args) + + piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats) + with Popen(args, stdin=subprocess.PIPE, env=env) as proc: + if piped: + self.on_process_started(proc, proc.stdin) + try: + retval = proc.wait() + except BaseException as e: + # subprocces.run would send the SIGKILL signal to ffmpeg and the + # mp4 file couldn't be played, but if we ask ffmpeg to quit it + # produces a file that is playable (this is mostly useful for live + # streams). Note that Windows is not affected and produces playable + # files (see https://github.com/ytdl-org/youtube-dl/issues/8300). + if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and not piped: + proc.communicate_or_kill(b'q') + else: + proc.kill(timeout=None) + raise + return retval + + +class AVconvFD(FFmpegFD): + pass + + +_BY_NAME = { + klass.get_basename(): klass + for name, klass in globals().items() + if name.endswith('FD') and name not in ('ExternalFD', 'FragmentFD') +} + + +def list_external_downloaders(): + return sorted(_BY_NAME.keys()) + + +def get_external_downloader(external_downloader): + """ Given the name of the executable, see whether we support the given downloader """ + bn = os.path.splitext(os.path.basename(external_downloader))[0] + return _BY_NAME.get(bn) or next(( + klass for klass in _BY_NAME.values() if klass.EXE_NAME in bn + ), None) diff --git a/yt_dlp/downloader/f4m.py b/yt_dlp/downloader/f4m.py new file mode 100644 index 0000000..28cbba0 --- /dev/null +++ b/yt_dlp/downloader/f4m.py @@ -0,0 +1,427 @@ +import base64 +import io +import itertools +import struct +import time +import urllib.parse + +from .fragment import FragmentFD +from ..compat import compat_etree_fromstring +from ..networking.exceptions import HTTPError +from ..utils import fix_xml_ampersands, xpath_text + + +class DataTruncatedError(Exception): + pass + + +class FlvReader(io.BytesIO): + """ + Reader for Flv files + The file format is documented in https://www.adobe.com/devnet/f4v.html + """ + + def read_bytes(self, n): + data = self.read(n) + if len(data) < n: + raise DataTruncatedError( + 'FlvReader error: need %d bytes while only %d bytes got' % ( + n, len(data))) + return data + + # Utility functions for reading numbers and strings + def read_unsigned_long_long(self): + return struct.unpack('!Q', self.read_bytes(8))[0] + + def read_unsigned_int(self): + return struct.unpack('!I', self.read_bytes(4))[0] + + def read_unsigned_char(self): + return struct.unpack('!B', self.read_bytes(1))[0] + + def read_string(self): + res = b'' + while True: + char = self.read_bytes(1) + if char == b'\x00': + break + res += char + return res + + def read_box_info(self): + """ + Read a box and return the info as a tuple: (box_size, box_type, box_data) + """ + real_size = size = self.read_unsigned_int() + box_type = self.read_bytes(4) + header_end = 8 + if size == 1: + real_size = self.read_unsigned_long_long() + header_end = 16 + return real_size, box_type, self.read_bytes(real_size - header_end) + + def read_asrt(self): + # version + self.read_unsigned_char() + # flags + self.read_bytes(3) + quality_entry_count = self.read_unsigned_char() + # QualityEntryCount + for i in range(quality_entry_count): + self.read_string() + + segment_run_count = self.read_unsigned_int() + segments = [] + for i in range(segment_run_count): + first_segment = self.read_unsigned_int() + fragments_per_segment = self.read_unsigned_int() + segments.append((first_segment, fragments_per_segment)) + + return { + 'segment_run': segments, + } + + def read_afrt(self): + # version + self.read_unsigned_char() + # flags + self.read_bytes(3) + # time scale + self.read_unsigned_int() + + quality_entry_count = self.read_unsigned_char() + # QualitySegmentUrlModifiers + for i in range(quality_entry_count): + self.read_string() + + fragments_count = self.read_unsigned_int() + fragments = [] + for i in range(fragments_count): + first = self.read_unsigned_int() + first_ts = self.read_unsigned_long_long() + duration = self.read_unsigned_int() + if duration == 0: + discontinuity_indicator = self.read_unsigned_char() + else: + discontinuity_indicator = None + fragments.append({ + 'first': first, + 'ts': first_ts, + 'duration': duration, + 'discontinuity_indicator': discontinuity_indicator, + }) + + return { + 'fragments': fragments, + } + + def read_abst(self): + # version + self.read_unsigned_char() + # flags + self.read_bytes(3) + + self.read_unsigned_int() # BootstrapinfoVersion + # Profile,Live,Update,Reserved + flags = self.read_unsigned_char() + live = flags & 0x20 != 0 + # time scale + self.read_unsigned_int() + # CurrentMediaTime + self.read_unsigned_long_long() + # SmpteTimeCodeOffset + self.read_unsigned_long_long() + + self.read_string() # MovieIdentifier + server_count = self.read_unsigned_char() + # ServerEntryTable + for i in range(server_count): + self.read_string() + quality_count = self.read_unsigned_char() + # QualityEntryTable + for i in range(quality_count): + self.read_string() + # DrmData + self.read_string() + # MetaData + self.read_string() + + segments_count = self.read_unsigned_char() + segments = [] + for i in range(segments_count): + box_size, box_type, box_data = self.read_box_info() + assert box_type == b'asrt' + segment = FlvReader(box_data).read_asrt() + segments.append(segment) + fragments_run_count = self.read_unsigned_char() + fragments = [] + for i in range(fragments_run_count): + box_size, box_type, box_data = self.read_box_info() + assert box_type == b'afrt' + fragments.append(FlvReader(box_data).read_afrt()) + + return { + 'segments': segments, + 'fragments': fragments, + 'live': live, + } + + def read_bootstrap_info(self): + total_size, box_type, box_data = self.read_box_info() + assert box_type == b'abst' + return FlvReader(box_data).read_abst() + + +def read_bootstrap_info(bootstrap_bytes): + return FlvReader(bootstrap_bytes).read_bootstrap_info() + + +def build_fragments_list(boot_info): + """ Return a list of (segment, fragment) for each fragment in the video """ + res = [] + segment_run_table = boot_info['segments'][0] + fragment_run_entry_table = boot_info['fragments'][0]['fragments'] + first_frag_number = fragment_run_entry_table[0]['first'] + fragments_counter = itertools.count(first_frag_number) + for segment, fragments_count in segment_run_table['segment_run']: + # In some live HDS streams (e.g. Rai), `fragments_count` is + # abnormal and causing out-of-memory errors. It's OK to change the + # number of fragments for live streams as they are updated periodically + if fragments_count == 4294967295 and boot_info['live']: + fragments_count = 2 + for _ in range(fragments_count): + res.append((segment, next(fragments_counter))) + + if boot_info['live']: + res = res[-2:] + + return res + + +def write_unsigned_int(stream, val): + stream.write(struct.pack('!I', val)) + + +def write_unsigned_int_24(stream, val): + stream.write(struct.pack('!I', val)[1:]) + + +def write_flv_header(stream): + """Writes the FLV header to stream""" + # FLV header + stream.write(b'FLV\x01') + stream.write(b'\x05') + stream.write(b'\x00\x00\x00\x09') + stream.write(b'\x00\x00\x00\x00') + + +def write_metadata_tag(stream, metadata): + """Writes optional metadata tag to stream""" + SCRIPT_TAG = b'\x12' + FLV_TAG_HEADER_LEN = 11 + + if metadata: + stream.write(SCRIPT_TAG) + write_unsigned_int_24(stream, len(metadata)) + stream.write(b'\x00\x00\x00\x00\x00\x00\x00') + stream.write(metadata) + write_unsigned_int(stream, FLV_TAG_HEADER_LEN + len(metadata)) + + +def remove_encrypted_media(media): + return list(filter(lambda e: 'drmAdditionalHeaderId' not in e.attrib + and 'drmAdditionalHeaderSetId' not in e.attrib, + media)) + + +def _add_ns(prop, ver=1): + return '{http://ns.adobe.com/f4m/%d.0}%s' % (ver, prop) + + +def get_base_url(manifest): + base_url = xpath_text( + manifest, [_add_ns('baseURL'), _add_ns('baseURL', 2)], + 'base URL', default=None) + if base_url: + base_url = base_url.strip() + return base_url + + +class F4mFD(FragmentFD): + """ + A downloader for f4m manifests or AdobeHDS. + """ + + def _get_unencrypted_media(self, doc): + media = doc.findall(_add_ns('media')) + if not media: + self.report_error('No media found') + if not self.params.get('allow_unplayable_formats'): + for e in (doc.findall(_add_ns('drmAdditionalHeader')) + + doc.findall(_add_ns('drmAdditionalHeaderSet'))): + # If id attribute is missing it's valid for all media nodes + # without drmAdditionalHeaderId or drmAdditionalHeaderSetId attribute + if 'id' not in e.attrib: + self.report_error('Missing ID in f4m DRM') + media = remove_encrypted_media(media) + if not media: + self.report_error('Unsupported DRM') + return media + + def _get_bootstrap_from_url(self, bootstrap_url): + bootstrap = self.ydl.urlopen(bootstrap_url).read() + return read_bootstrap_info(bootstrap) + + def _update_live_fragments(self, bootstrap_url, latest_fragment): + fragments_list = [] + retries = 30 + while (not fragments_list) and (retries > 0): + boot_info = self._get_bootstrap_from_url(bootstrap_url) + fragments_list = build_fragments_list(boot_info) + fragments_list = [f for f in fragments_list if f[1] > latest_fragment] + if not fragments_list: + # Retry after a while + time.sleep(5.0) + retries -= 1 + + if not fragments_list: + self.report_error('Failed to update fragments') + + return fragments_list + + def _parse_bootstrap_node(self, node, base_url): + # Sometimes non empty inline bootstrap info can be specified along + # with bootstrap url attribute (e.g. dummy inline bootstrap info + # contains whitespace characters in [1]). We will prefer bootstrap + # url over inline bootstrap info when present. + # 1. http://live-1-1.rutube.ru/stream/1024/HDS/SD/C2NKsS85HQNckgn5HdEmOQ/1454167650/S-s604419906/move/four/dirs/upper/1024-576p.f4m + bootstrap_url = node.get('url') + if bootstrap_url: + bootstrap_url = urllib.parse.urljoin( + base_url, bootstrap_url) + boot_info = self._get_bootstrap_from_url(bootstrap_url) + else: + bootstrap_url = None + bootstrap = base64.b64decode(node.text) + boot_info = read_bootstrap_info(bootstrap) + return boot_info, bootstrap_url + + def real_download(self, filename, info_dict): + man_url = info_dict['url'] + requested_bitrate = info_dict.get('tbr') + self.to_screen('[%s] Downloading f4m manifest' % self.FD_NAME) + + urlh = self.ydl.urlopen(self._prepare_url(info_dict, man_url)) + man_url = urlh.url + # Some manifests may be malformed, e.g. prosiebensat1 generated manifests + # (see https://github.com/ytdl-org/youtube-dl/issues/6215#issuecomment-121704244 + # and https://github.com/ytdl-org/youtube-dl/issues/7823) + manifest = fix_xml_ampersands(urlh.read().decode('utf-8', 'ignore')).strip() + + doc = compat_etree_fromstring(manifest) + formats = [(int(f.attrib.get('bitrate', -1)), f) + for f in self._get_unencrypted_media(doc)] + if requested_bitrate is None or len(formats) == 1: + # get the best format + formats = sorted(formats, key=lambda f: f[0]) + rate, media = formats[-1] + else: + rate, media = list(filter( + lambda f: int(f[0]) == requested_bitrate, formats))[0] + + # Prefer baseURL for relative URLs as per 11.2 of F4M 3.0 spec. + man_base_url = get_base_url(doc) or man_url + + base_url = urllib.parse.urljoin(man_base_url, media.attrib['url']) + bootstrap_node = doc.find(_add_ns('bootstrapInfo')) + boot_info, bootstrap_url = self._parse_bootstrap_node( + bootstrap_node, man_base_url) + live = boot_info['live'] + metadata_node = media.find(_add_ns('metadata')) + if metadata_node is not None: + metadata = base64.b64decode(metadata_node.text) + else: + metadata = None + + fragments_list = build_fragments_list(boot_info) + test = self.params.get('test', False) + if test: + # We only download the first fragment + fragments_list = fragments_list[:1] + total_frags = len(fragments_list) + # For some akamai manifests we'll need to add a query to the fragment url + akamai_pv = xpath_text(doc, _add_ns('pv-2.0')) + + ctx = { + 'filename': filename, + 'total_frags': total_frags, + 'live': bool(live), + } + + self._prepare_frag_download(ctx) + + dest_stream = ctx['dest_stream'] + + if ctx['complete_frags_downloaded_bytes'] == 0: + write_flv_header(dest_stream) + if not live: + write_metadata_tag(dest_stream, metadata) + + base_url_parsed = urllib.parse.urlparse(base_url) + + self._start_frag_download(ctx, info_dict) + + frag_index = 0 + while fragments_list: + seg_i, frag_i = fragments_list.pop(0) + frag_index += 1 + if frag_index <= ctx['fragment_index']: + continue + name = 'Seg%d-Frag%d' % (seg_i, frag_i) + query = [] + if base_url_parsed.query: + query.append(base_url_parsed.query) + if akamai_pv: + query.append(akamai_pv.strip(';')) + if info_dict.get('extra_param_to_segment_url'): + query.append(info_dict['extra_param_to_segment_url']) + url_parsed = base_url_parsed._replace(path=base_url_parsed.path + name, query='&'.join(query)) + try: + success = self._download_fragment(ctx, url_parsed.geturl(), info_dict) + if not success: + return False + down_data = self._read_fragment(ctx) + reader = FlvReader(down_data) + while True: + try: + _, box_type, box_data = reader.read_box_info() + except DataTruncatedError: + if test: + # In tests, segments may be truncated, and thus + # FlvReader may not be able to parse the whole + # chunk. If so, write the segment as is + # See https://github.com/ytdl-org/youtube-dl/issues/9214 + dest_stream.write(down_data) + break + raise + if box_type == b'mdat': + self._append_fragment(ctx, box_data) + break + except HTTPError as err: + if live and (err.status == 404 or err.status == 410): + # We didn't keep up with the live window. Continue + # with the next available fragment. + msg = 'Fragment %d unavailable' % frag_i + self.report_warning(msg) + fragments_list = [] + else: + raise + + if not fragments_list and not test and live and bootstrap_url: + fragments_list = self._update_live_fragments(bootstrap_url, frag_i) + total_frags += len(fragments_list) + if fragments_list and (fragments_list[0][1] > frag_i + 1): + msg = 'Missed %d fragments' % (fragments_list[0][1] - (frag_i + 1)) + self.report_warning(msg) + + return self._finish_frag_download(ctx, info_dict) diff --git a/yt_dlp/downloader/fc2.py b/yt_dlp/downloader/fc2.py new file mode 100644 index 0000000..f9763de --- /dev/null +++ b/yt_dlp/downloader/fc2.py @@ -0,0 +1,46 @@ +import threading + +from .common import FileDownloader +from .external import FFmpegFD + + +class FC2LiveFD(FileDownloader): + """ + Downloads FC2 live without being stopped. <br> + Note, this is not a part of public API, and will be removed without notice. + DO NOT USE + """ + + def real_download(self, filename, info_dict): + ws = info_dict['ws'] + + heartbeat_lock = threading.Lock() + heartbeat_state = [None, 1] + + def heartbeat(): + if heartbeat_state[1] < 0: + return + + try: + heartbeat_state[1] += 1 + ws.send('{"name":"heartbeat","arguments":{},"id":%d}' % heartbeat_state[1]) + except Exception: + self.to_screen('[fc2:live] Heartbeat failed') + + with heartbeat_lock: + heartbeat_state[0] = threading.Timer(30, heartbeat) + heartbeat_state[0]._daemonic = True + heartbeat_state[0].start() + + heartbeat() + + new_info_dict = info_dict.copy() + new_info_dict.update({ + 'ws': None, + 'protocol': 'live_ffmpeg', + }) + try: + return FFmpegFD(self.ydl, self.params or {}).download(filename, new_info_dict) + finally: + # stop heartbeating + heartbeat_state[1] = -1 diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py new file mode 100644 index 0000000..b4f003d --- /dev/null +++ b/yt_dlp/downloader/fragment.py @@ -0,0 +1,527 @@ +import concurrent.futures +import contextlib +import json +import math +import os +import struct +import time + +from .common import FileDownloader +from .http import HttpFD +from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7 +from ..compat import compat_os_name +from ..networking import Request +from ..networking.exceptions import HTTPError, IncompleteRead +from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj +from ..utils.networking import HTTPHeaderDict +from ..utils.progress import ProgressCalculator + + +class HttpQuietDownloader(HttpFD): + def to_screen(self, *args, **kargs): + pass + + to_console_title = to_screen + + +class FragmentFD(FileDownloader): + """ + A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests). + + Available options: + + fragment_retries: Number of times to retry a fragment for HTTP error + (DASH and hlsnative only). Default is 0 for API, but 10 for CLI + skip_unavailable_fragments: + Skip unavailable fragments (DASH and hlsnative only) + keep_fragments: Keep downloaded fragments on disk after downloading is + finished + concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads + _no_ytdl_file: Don't use .ytdl file + + For each incomplete fragment download yt-dlp keeps on disk a special + bookkeeping file with download state and metadata (in future such files will + be used for any incomplete download handled by yt-dlp). This file is + used to properly handle resuming, check download file consistency and detect + potential errors. The file has a .ytdl extension and represents a standard + JSON file of the following format: + + extractor: + Dictionary of extractor related data. TBD. + + downloader: + Dictionary of downloader related data. May contain following data: + current_fragment: + Dictionary with current (being downloaded) fragment data: + index: 0-based index of current fragment among all fragments + fragment_count: + Total count of fragments + + This feature is experimental and file format may change in future. + """ + + def report_retry_fragment(self, err, frag_index, count, retries): + self.deprecation_warning('yt_dlp.downloader.FragmentFD.report_retry_fragment is deprecated. ' + 'Use yt_dlp.downloader.FileDownloader.report_retry instead') + return self.report_retry(err, count, retries, frag_index) + + def report_skip_fragment(self, frag_index, err=None): + err = f' {err};' if err else '' + self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...') + + def _prepare_url(self, info_dict, url): + headers = info_dict.get('http_headers') + return Request(url, None, headers) if headers else url + + def _prepare_and_start_frag_download(self, ctx, info_dict): + self._prepare_frag_download(ctx) + self._start_frag_download(ctx, info_dict) + + def __do_ytdl_file(self, ctx): + return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file') + + def _read_ytdl_file(self, ctx): + assert 'ytdl_corrupt' not in ctx + stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r') + try: + ytdl_data = json.loads(stream.read()) + ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index'] + if 'extra_state' in ytdl_data['downloader']: + ctx['extra_state'] = ytdl_data['downloader']['extra_state'] + except Exception: + ctx['ytdl_corrupt'] = True + finally: + stream.close() + + def _write_ytdl_file(self, ctx): + frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w') + try: + downloader = { + 'current_fragment': { + 'index': ctx['fragment_index'], + }, + } + if 'extra_state' in ctx: + downloader['extra_state'] = ctx['extra_state'] + if ctx.get('fragment_count') is not None: + downloader['fragment_count'] = ctx['fragment_count'] + frag_index_stream.write(json.dumps({'downloader': downloader})) + finally: + frag_index_stream.close() + + def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None): + fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index']) + fragment_info_dict = { + 'url': frag_url, + 'http_headers': headers or info_dict.get('http_headers'), + 'request_data': request_data, + 'ctx_id': ctx.get('ctx_id'), + } + frag_resume_len = 0 + if ctx['dl'].params.get('continuedl', True): + frag_resume_len = self.filesize_or_none(self.temp_name(fragment_filename)) + fragment_info_dict['frag_resume_len'] = ctx['frag_resume_len'] = frag_resume_len + + success, _ = ctx['dl'].download(fragment_filename, fragment_info_dict) + if not success: + return False + if fragment_info_dict.get('filetime'): + ctx['fragment_filetime'] = fragment_info_dict.get('filetime') + ctx['fragment_filename_sanitized'] = fragment_filename + return True + + def _read_fragment(self, ctx): + if not ctx.get('fragment_filename_sanitized'): + return None + try: + down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb') + except FileNotFoundError: + if ctx.get('live'): + return None + raise + ctx['fragment_filename_sanitized'] = frag_sanitized + frag_content = down.read() + down.close() + return frag_content + + def _append_fragment(self, ctx, frag_content): + try: + ctx['dest_stream'].write(frag_content) + ctx['dest_stream'].flush() + finally: + if self.__do_ytdl_file(ctx): + self._write_ytdl_file(ctx) + if not self.params.get('keep_fragments', False): + self.try_remove(encodeFilename(ctx['fragment_filename_sanitized'])) + del ctx['fragment_filename_sanitized'] + + def _prepare_frag_download(self, ctx): + if not ctx.setdefault('live', False): + total_frags_str = '%d' % ctx['total_frags'] + ad_frags = ctx.get('ad_frags', 0) + if ad_frags: + total_frags_str += ' (not including %d ad)' % ad_frags + else: + total_frags_str = 'unknown (live)' + self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}') + self.report_destination(ctx['filename']) + dl = HttpQuietDownloader(self.ydl, { + **self.params, + 'noprogress': True, + 'test': False, + 'sleep_interval': 0, + 'max_sleep_interval': 0, + 'sleep_interval_subtitles': 0, + }) + tmpfilename = self.temp_name(ctx['filename']) + open_mode = 'wb' + + # Establish possible resume length + resume_len = self.filesize_or_none(tmpfilename) + if resume_len > 0: + open_mode = 'ab' + + # Should be initialized before ytdl file check + ctx.update({ + 'tmpfilename': tmpfilename, + 'fragment_index': 0, + }) + + if self.__do_ytdl_file(ctx): + ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))) + continuedl = self.params.get('continuedl', True) + if continuedl and ytdl_file_exists: + self._read_ytdl_file(ctx) + is_corrupt = ctx.get('ytdl_corrupt') is True + is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0 + if is_corrupt or is_inconsistent: + message = ( + '.ytdl file is corrupt' if is_corrupt else + 'Inconsistent state of incomplete fragment download') + self.report_warning( + '%s. Restarting from the beginning ...' % message) + ctx['fragment_index'] = resume_len = 0 + if 'ytdl_corrupt' in ctx: + del ctx['ytdl_corrupt'] + self._write_ytdl_file(ctx) + + else: + if not continuedl: + if ytdl_file_exists: + self._read_ytdl_file(ctx) + ctx['fragment_index'] = resume_len = 0 + self._write_ytdl_file(ctx) + assert ctx['fragment_index'] == 0 + + dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode) + + ctx.update({ + 'dl': dl, + 'dest_stream': dest_stream, + 'tmpfilename': tmpfilename, + # Total complete fragments downloaded so far in bytes + 'complete_frags_downloaded_bytes': resume_len, + }) + + def _start_frag_download(self, ctx, info_dict): + resume_len = ctx['complete_frags_downloaded_bytes'] + total_frags = ctx['total_frags'] + ctx_id = ctx.get('ctx_id') + # Stores the download progress, updated by the progress hook + state = { + 'status': 'downloading', + 'downloaded_bytes': resume_len, + 'fragment_index': ctx['fragment_index'], + 'fragment_count': total_frags, + 'filename': ctx['filename'], + 'tmpfilename': ctx['tmpfilename'], + } + + ctx['started'] = time.time() + progress = ProgressCalculator(resume_len) + + def frag_progress_hook(s): + if s['status'] not in ('downloading', 'finished'): + return + + if not total_frags and ctx.get('fragment_count'): + state['fragment_count'] = ctx['fragment_count'] + + if ctx_id is not None and s.get('ctx_id') != ctx_id: + return + + state['max_progress'] = ctx.get('max_progress') + state['progress_idx'] = ctx.get('progress_idx') + + state['elapsed'] = progress.elapsed + frag_total_bytes = s.get('total_bytes') or 0 + s['fragment_info_dict'] = s.pop('info_dict', {}) + + # XXX: Fragment resume is not accounted for here + if not ctx['live']: + estimated_size = ( + (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes) + / (state['fragment_index'] + 1) * total_frags) + progress.total = estimated_size + progress.update(s.get('downloaded_bytes')) + state['total_bytes_estimate'] = progress.total + else: + progress.update(s.get('downloaded_bytes')) + + if s['status'] == 'finished': + state['fragment_index'] += 1 + ctx['fragment_index'] = state['fragment_index'] + progress.thread_reset() + + state['downloaded_bytes'] = ctx['complete_frags_downloaded_bytes'] = progress.downloaded + state['speed'] = ctx['speed'] = progress.speed.smooth + state['eta'] = progress.eta.smooth + + self._hook_progress(state, info_dict) + + ctx['dl'].add_progress_hook(frag_progress_hook) + + return ctx['started'] + + def _finish_frag_download(self, ctx, info_dict): + ctx['dest_stream'].close() + if self.__do_ytdl_file(ctx): + self.try_remove(self.ytdl_filename(ctx['filename'])) + elapsed = time.time() - ctx['started'] + + to_file = ctx['tmpfilename'] != '-' + if to_file: + downloaded_bytes = self.filesize_or_none(ctx['tmpfilename']) + else: + downloaded_bytes = ctx['complete_frags_downloaded_bytes'] + + if not downloaded_bytes: + if to_file: + self.try_remove(ctx['tmpfilename']) + self.report_error('The downloaded file is empty') + return False + elif to_file: + self.try_rename(ctx['tmpfilename'], ctx['filename']) + filetime = ctx.get('fragment_filetime') + if self.params.get('updatetime', True) and filetime: + with contextlib.suppress(Exception): + os.utime(ctx['filename'], (time.time(), filetime)) + + self._hook_progress({ + 'downloaded_bytes': downloaded_bytes, + 'total_bytes': downloaded_bytes, + 'filename': ctx['filename'], + 'status': 'finished', + 'elapsed': elapsed, + 'ctx_id': ctx.get('ctx_id'), + 'max_progress': ctx.get('max_progress'), + 'progress_idx': ctx.get('progress_idx'), + }, info_dict) + return True + + def _prepare_external_frag_download(self, ctx): + if 'live' not in ctx: + ctx['live'] = False + if not ctx['live']: + total_frags_str = '%d' % ctx['total_frags'] + ad_frags = ctx.get('ad_frags', 0) + if ad_frags: + total_frags_str += ' (not including %d ad)' % ad_frags + else: + total_frags_str = 'unknown (live)' + self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}') + + tmpfilename = self.temp_name(ctx['filename']) + + # Should be initialized before ytdl file check + ctx.update({ + 'tmpfilename': tmpfilename, + 'fragment_index': 0, + }) + + def decrypter(self, info_dict): + _key_cache = {} + + def _get_key(url): + if url not in _key_cache: + _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read() + return _key_cache[url] + + def decrypt_fragment(fragment, frag_content): + if frag_content is None: + return + decrypt_info = fragment.get('decrypt_info') + if not decrypt_info or decrypt_info['METHOD'] != 'AES-128': + return frag_content + iv = decrypt_info.get('IV') or struct.pack('>8xq', fragment['media_sequence']) + decrypt_info['KEY'] = (decrypt_info.get('KEY') + or _get_key(traverse_obj(info_dict, ('hls_aes', 'uri')) or decrypt_info['URI'])) + # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block + # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded, + # not what it decrypts to. + if self.params.get('test', False): + return frag_content + return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)) + + return decrypt_fragment + + def download_and_append_fragments_multiple(self, *args, **kwargs): + ''' + @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... + all args must be either tuple or list + ''' + interrupt_trigger = [True] + max_progress = len(args) + if max_progress == 1: + return self.download_and_append_fragments(*args[0], **kwargs) + max_workers = self.params.get('concurrent_fragment_downloads', 1) + if max_progress > 1: + self._prepare_multiline_status(max_progress) + is_live = any(traverse_obj(args, (..., 2, 'is_live'))) + + def thread_func(idx, ctx, fragments, info_dict, tpe): + ctx['max_progress'] = max_progress + ctx['progress_idx'] = idx + return self.download_and_append_fragments( + ctx, fragments, info_dict, **kwargs, tpe=tpe, interrupt_trigger=interrupt_trigger) + + class FTPE(concurrent.futures.ThreadPoolExecutor): + # has to stop this or it's going to wait on the worker thread itself + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + if compat_os_name == 'nt': + def future_result(future): + while True: + try: + return future.result(0.1) + except KeyboardInterrupt: + raise + except concurrent.futures.TimeoutError: + continue + else: + def future_result(future): + return future.result() + + def interrupt_trigger_iter(fg): + for f in fg: + if not interrupt_trigger[0]: + break + yield f + + spins = [] + for idx, (ctx, fragments, info_dict) in enumerate(args): + tpe = FTPE(math.ceil(max_workers / max_progress)) + job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe) + spins.append((tpe, job)) + + result = True + for tpe, job in spins: + try: + result = result and future_result(job) + except KeyboardInterrupt: + interrupt_trigger[0] = False + finally: + tpe.shutdown(wait=True) + if not interrupt_trigger[0] and not is_live: + raise KeyboardInterrupt() + # we expect the user wants to stop and DO WANT the preceding postprocessors to run; + # so returning a intermediate result here instead of KeyboardInterrupt on live + return result + + def download_and_append_fragments( + self, ctx, fragments, info_dict, *, is_fatal=(lambda idx: False), + pack_func=(lambda content, idx: content), finish_func=None, + tpe=None, interrupt_trigger=(True, )): + + if not self.params.get('skip_unavailable_fragments', True): + is_fatal = lambda _: True + + def download_fragment(fragment, ctx): + if not interrupt_trigger[0]: + return + + frag_index = ctx['fragment_index'] = fragment['frag_index'] + ctx['last_error'] = None + headers = HTTPHeaderDict(info_dict.get('http_headers')) + byte_range = fragment.get('byte_range') + if byte_range: + headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1) + + # Never skip the first fragment + fatal = is_fatal(fragment.get('index') or (frag_index - 1)) + + def error_callback(err, count, retries): + if fatal and count > retries: + ctx['dest_stream'].close() + self.report_retry(err, count, retries, frag_index, fatal) + ctx['last_error'] = err + + for retry in RetryManager(self.params.get('fragment_retries'), error_callback): + try: + ctx['fragment_count'] = fragment.get('fragment_count') + if not self._download_fragment( + ctx, fragment['url'], info_dict, headers, info_dict.get('request_data')): + return + except (HTTPError, IncompleteRead) as err: + retry.error = err + continue + except DownloadError: # has own retry settings + if fatal: + raise + + def append_fragment(frag_content, frag_index, ctx): + if frag_content: + self._append_fragment(ctx, pack_func(frag_content, frag_index)) + elif not is_fatal(frag_index - 1): + self.report_skip_fragment(frag_index, 'fragment not found') + else: + ctx['dest_stream'].close() + self.report_error(f'fragment {frag_index} not found, unable to continue') + return False + return True + + decrypt_fragment = self.decrypter(info_dict) + + max_workers = math.ceil( + self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1)) + if max_workers > 1: + def _download_fragment(fragment): + ctx_copy = ctx.copy() + download_fragment(fragment, ctx_copy) + return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized') + + with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: + try: + for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments): + ctx.update({ + 'fragment_filename_sanitized': frag_filename, + 'fragment_index': frag_index, + }) + if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx): + return False + except KeyboardInterrupt: + self._finish_multiline_status() + self.report_error( + 'Interrupted by user. Waiting for all threads to shutdown...', is_error=False, tb=False) + pool.shutdown(wait=False) + raise + else: + for fragment in fragments: + if not interrupt_trigger[0]: + break + try: + download_fragment(fragment, ctx) + result = append_fragment( + decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx) + except KeyboardInterrupt: + if info_dict.get('is_live'): + break + raise + if not result: + return False + + if finish_func is not None: + ctx['dest_stream'].write(finish_func()) + ctx['dest_stream'].flush() + return self._finish_frag_download(ctx, info_dict) diff --git a/yt_dlp/downloader/hls.py b/yt_dlp/downloader/hls.py new file mode 100644 index 0000000..4ac5d99 --- /dev/null +++ b/yt_dlp/downloader/hls.py @@ -0,0 +1,378 @@ +import binascii +import io +import re +import urllib.parse + +from . import get_suitable_downloader +from .external import FFmpegFD +from .fragment import FragmentFD +from .. import webvtt +from ..dependencies import Cryptodome +from ..utils import ( + bug_reports_message, + parse_m3u8_attributes, + remove_start, + traverse_obj, + update_url_query, + urljoin, +) + + +class HlsFD(FragmentFD): + """ + Download segments in a m3u8 manifest. External downloaders can take over + the fragment downloads by supporting the 'm3u8_frag_urls' protocol and + re-defining 'supports_manifest' function + """ + + FD_NAME = 'hlsnative' + + @staticmethod + def _has_drm(manifest): # TODO: https://github.com/yt-dlp/yt-dlp/pull/5039 + return bool(re.search('|'.join(( + r'#EXT-X-(?:SESSION-)?KEY:.*?URI="skd://', # Apple FairPlay + r'#EXT-X-(?:SESSION-)?KEY:.*?KEYFORMAT="com\.apple\.streamingkeydelivery"', # Apple FairPlay + r'#EXT-X-(?:SESSION-)?KEY:.*?KEYFORMAT="com\.microsoft\.playready"', # Microsoft PlayReady + r'#EXT-X-FAXS-CM:', # Adobe Flash Access + )), manifest)) + + @classmethod + def can_download(cls, manifest, info_dict, allow_unplayable_formats=False): + UNSUPPORTED_FEATURES = [ + # r'#EXT-X-BYTERANGE', # playlists composed of byte ranges of media files [2] + + # Live streams heuristic does not always work (e.g. geo restricted to Germany + # http://hls-geo.daserste.de/i/videoportal/Film/c_620000/622873/format,716451,716457,716450,716458,716459,.mp4.csmil/index_4_av.m3u8?null=0) + # r'#EXT-X-MEDIA-SEQUENCE:(?!0$)', # live streams [3] + + # This heuristic also is not correct since segments may not be appended as well. + # Twitch vods of finished streams have EXT-X-PLAYLIST-TYPE:EVENT despite + # no segments will definitely be appended to the end of the playlist. + # r'#EXT-X-PLAYLIST-TYPE:EVENT', # media segments may be appended to the end of + # # event media playlists [4] + # r'#EXT-X-MAP:', # media initialization [5] + # 1. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.4 + # 2. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.2 + # 3. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.3.2 + # 4. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.3.5 + # 5. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.5 + ] + if not allow_unplayable_formats: + UNSUPPORTED_FEATURES += [ + r'#EXT-X-KEY:METHOD=(?!NONE|AES-128)', # encrypted streams [1], but not necessarily DRM + ] + + def check_results(): + yield not info_dict.get('is_live') + for feature in UNSUPPORTED_FEATURES: + yield not re.search(feature, manifest) + if not allow_unplayable_formats: + yield not cls._has_drm(manifest) + return all(check_results()) + + def real_download(self, filename, info_dict): + man_url = info_dict['url'] + self.to_screen('[%s] Downloading m3u8 manifest' % self.FD_NAME) + + urlh = self.ydl.urlopen(self._prepare_url(info_dict, man_url)) + man_url = urlh.url + s = urlh.read().decode('utf-8', 'ignore') + + can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None + if can_download: + has_ffmpeg = FFmpegFD.available() + no_crypto = not Cryptodome.AES and '#EXT-X-KEY:METHOD=AES-128' in s + if no_crypto and has_ffmpeg: + can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available' + elif no_crypto: + message = ('The stream has AES-128 encryption and neither ffmpeg nor pycryptodomex are available; ' + 'Decryption will be performed natively, but will be extremely slow') + elif info_dict.get('extractor_key') == 'Generic' and re.search(r'(?m)#EXT-X-MEDIA-SEQUENCE:(?!0$)', s): + install_ffmpeg = '' if has_ffmpeg else 'install ffmpeg and ' + message = ('Live HLS streams are not supported by the native downloader. If this is a livestream, ' + f'please {install_ffmpeg}add "--downloader ffmpeg --hls-use-mpegts" to your command') + if not can_download: + if self._has_drm(s) and not self.params.get('allow_unplayable_formats'): + if info_dict.get('has_drm') and self.params.get('test'): + self.to_screen(f'[{self.FD_NAME}] This format is DRM protected', skip_eol=True) + else: + self.report_error( + 'This format is DRM protected; Try selecting another format with --format or ' + 'add --check-formats to automatically fallback to the next best format', tb=False) + return False + message = message or 'Unsupported features have been detected' + fd = FFmpegFD(self.ydl, self.params) + self.report_warning(f'{message}; extraction will be delegated to {fd.get_basename()}') + return fd.real_download(filename, info_dict) + elif message: + self.report_warning(message) + + is_webvtt = info_dict['ext'] == 'vtt' + if is_webvtt: + real_downloader = None # Packing the fragments is not currently supported for external downloader + else: + real_downloader = get_suitable_downloader( + info_dict, self.params, None, protocol='m3u8_frag_urls', to_stdout=(filename == '-')) + if real_downloader and not real_downloader.supports_manifest(s): + real_downloader = None + if real_downloader: + self.to_screen(f'[{self.FD_NAME}] Fragment downloads will be delegated to {real_downloader.get_basename()}') + + def is_ad_fragment_start(s): + return (s.startswith('#ANVATO-SEGMENT-INFO') and 'type=ad' in s + or s.startswith('#UPLYNK-SEGMENT') and s.endswith(',ad')) + + def is_ad_fragment_end(s): + return (s.startswith('#ANVATO-SEGMENT-INFO') and 'type=master' in s + or s.startswith('#UPLYNK-SEGMENT') and s.endswith(',segment')) + + fragments = [] + + media_frags = 0 + ad_frags = 0 + ad_frag_next = False + for line in s.splitlines(): + line = line.strip() + if not line: + continue + if line.startswith('#'): + if is_ad_fragment_start(line): + ad_frag_next = True + elif is_ad_fragment_end(line): + ad_frag_next = False + continue + if ad_frag_next: + ad_frags += 1 + continue + media_frags += 1 + + ctx = { + 'filename': filename, + 'total_frags': media_frags, + 'ad_frags': ad_frags, + } + + if real_downloader: + self._prepare_external_frag_download(ctx) + else: + self._prepare_and_start_frag_download(ctx, info_dict) + + extra_state = ctx.setdefault('extra_state', {}) + + format_index = info_dict.get('format_index') + extra_query = None + extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url') + if extra_param_to_segment_url: + extra_query = urllib.parse.parse_qs(extra_param_to_segment_url) + i = 0 + media_sequence = 0 + decrypt_info = {'METHOD': 'NONE'} + external_aes_key = traverse_obj(info_dict, ('hls_aes', 'key')) + if external_aes_key: + external_aes_key = binascii.unhexlify(remove_start(external_aes_key, '0x')) + assert len(external_aes_key) in (16, 24, 32), 'Invalid length for HLS AES-128 key' + external_aes_iv = traverse_obj(info_dict, ('hls_aes', 'iv')) + if external_aes_iv: + external_aes_iv = binascii.unhexlify(remove_start(external_aes_iv, '0x').zfill(32)) + byte_range = {} + discontinuity_count = 0 + frag_index = 0 + ad_frag_next = False + for line in s.splitlines(): + line = line.strip() + if line: + if not line.startswith('#'): + if format_index and discontinuity_count != format_index: + continue + if ad_frag_next: + continue + frag_index += 1 + if frag_index <= ctx['fragment_index']: + continue + frag_url = urljoin(man_url, line) + if extra_query: + frag_url = update_url_query(frag_url, extra_query) + + fragments.append({ + 'frag_index': frag_index, + 'url': frag_url, + 'decrypt_info': decrypt_info, + 'byte_range': byte_range, + 'media_sequence': media_sequence, + }) + media_sequence += 1 + + elif line.startswith('#EXT-X-MAP'): + if format_index and discontinuity_count != format_index: + continue + if frag_index > 0: + self.report_error( + 'Initialization fragment found after media fragments, unable to download') + return False + frag_index += 1 + map_info = parse_m3u8_attributes(line[11:]) + frag_url = urljoin(man_url, map_info.get('URI')) + if extra_query: + frag_url = update_url_query(frag_url, extra_query) + + if map_info.get('BYTERANGE'): + splitted_byte_range = map_info.get('BYTERANGE').split('@') + sub_range_start = int(splitted_byte_range[1]) if len(splitted_byte_range) == 2 else byte_range['end'] + byte_range = { + 'start': sub_range_start, + 'end': sub_range_start + int(splitted_byte_range[0]), + } + + fragments.append({ + 'frag_index': frag_index, + 'url': frag_url, + 'decrypt_info': decrypt_info, + 'byte_range': byte_range, + 'media_sequence': media_sequence + }) + media_sequence += 1 + + elif line.startswith('#EXT-X-KEY'): + decrypt_url = decrypt_info.get('URI') + decrypt_info = parse_m3u8_attributes(line[11:]) + if decrypt_info['METHOD'] == 'AES-128': + if external_aes_iv: + decrypt_info['IV'] = external_aes_iv + elif 'IV' in decrypt_info: + decrypt_info['IV'] = binascii.unhexlify(decrypt_info['IV'][2:].zfill(32)) + if external_aes_key: + decrypt_info['KEY'] = external_aes_key + else: + decrypt_info['URI'] = urljoin(man_url, decrypt_info['URI']) + if extra_query: + decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query) + if decrypt_url != decrypt_info['URI']: + decrypt_info['KEY'] = None + + elif line.startswith('#EXT-X-MEDIA-SEQUENCE'): + media_sequence = int(line[22:]) + elif line.startswith('#EXT-X-BYTERANGE'): + splitted_byte_range = line[17:].split('@') + sub_range_start = int(splitted_byte_range[1]) if len(splitted_byte_range) == 2 else byte_range['end'] + byte_range = { + 'start': sub_range_start, + 'end': sub_range_start + int(splitted_byte_range[0]), + } + elif is_ad_fragment_start(line): + ad_frag_next = True + elif is_ad_fragment_end(line): + ad_frag_next = False + elif line.startswith('#EXT-X-DISCONTINUITY'): + discontinuity_count += 1 + i += 1 + + # We only download the first fragment during the test + if self.params.get('test', False): + fragments = [fragments[0] if fragments else None] + + if real_downloader: + info_dict['fragments'] = fragments + fd = real_downloader(self.ydl, self.params) + # TODO: Make progress updates work without hooking twice + # for ph in self._progress_hooks: + # fd.add_progress_hook(ph) + return fd.real_download(filename, info_dict) + + if is_webvtt: + def pack_fragment(frag_content, frag_index): + output = io.StringIO() + adjust = 0 + overflow = False + mpegts_last = None + for block in webvtt.parse_fragment(frag_content): + if isinstance(block, webvtt.CueBlock): + extra_state['webvtt_mpegts_last'] = mpegts_last + if overflow: + extra_state['webvtt_mpegts_adjust'] += 1 + overflow = False + block.start += adjust + block.end += adjust + + dedup_window = extra_state.setdefault('webvtt_dedup_window', []) + + ready = [] + + i = 0 + is_new = True + while i < len(dedup_window): + wcue = dedup_window[i] + wblock = webvtt.CueBlock.from_json(wcue) + i += 1 + if wblock.hinges(block): + wcue['end'] = block.end + is_new = False + continue + if wblock == block: + is_new = False + continue + if wblock.end > block.start: + continue + ready.append(wblock) + i -= 1 + del dedup_window[i] + + if is_new: + dedup_window.append(block.as_json) + for block in ready: + block.write_into(output) + + # we only emit cues once they fall out of the duplicate window + continue + elif isinstance(block, webvtt.Magic): + # take care of MPEG PES timestamp overflow + if block.mpegts is None: + block.mpegts = 0 + extra_state.setdefault('webvtt_mpegts_adjust', 0) + block.mpegts += extra_state['webvtt_mpegts_adjust'] << 33 + if block.mpegts < extra_state.get('webvtt_mpegts_last', 0): + overflow = True + block.mpegts += 1 << 33 + mpegts_last = block.mpegts + + if frag_index == 1: + extra_state['webvtt_mpegts'] = block.mpegts or 0 + extra_state['webvtt_local'] = block.local or 0 + # XXX: block.local = block.mpegts = None ? + else: + if block.mpegts is not None and block.local is not None: + adjust = ( + (block.mpegts - extra_state.get('webvtt_mpegts', 0)) + - (block.local - extra_state.get('webvtt_local', 0)) + ) + continue + elif isinstance(block, webvtt.HeaderBlock): + if frag_index != 1: + # XXX: this should probably be silent as well + # or verify that all segments contain the same data + self.report_warning(bug_reports_message( + 'Discarding a %s block found in the middle of the stream; ' + 'if the subtitles display incorrectly,' + % (type(block).__name__))) + continue + block.write_into(output) + + return output.getvalue().encode() + + def fin_fragments(): + dedup_window = extra_state.get('webvtt_dedup_window') + if not dedup_window: + return b'' + + output = io.StringIO() + for cue in dedup_window: + webvtt.CueBlock.from_json(cue).write_into(output) + + return output.getvalue().encode() + + if len(fragments) == 1: + self.download_and_append_fragments(ctx, fragments, info_dict) + else: + self.download_and_append_fragments( + ctx, fragments, info_dict, pack_func=pack_fragment, finish_func=fin_fragments) + else: + return self.download_and_append_fragments(ctx, fragments, info_dict) diff --git a/yt_dlp/downloader/http.py b/yt_dlp/downloader/http.py new file mode 100644 index 0000000..693828b --- /dev/null +++ b/yt_dlp/downloader/http.py @@ -0,0 +1,383 @@ +import os +import random +import time + +from .common import FileDownloader +from ..networking import Request +from ..networking.exceptions import ( + CertificateVerifyError, + HTTPError, + TransportError, +) +from ..utils import ( + ContentTooShortError, + RetryManager, + ThrottledDownload, + XAttrMetadataError, + XAttrUnavailableError, + encodeFilename, + int_or_none, + parse_http_range, + try_call, + write_xattr, +) +from ..utils.networking import HTTPHeaderDict + + +class HttpFD(FileDownloader): + def real_download(self, filename, info_dict): + url = info_dict['url'] + request_data = info_dict.get('request_data', None) + + class DownloadContext(dict): + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + ctx = DownloadContext() + ctx.filename = filename + ctx.tmpfilename = self.temp_name(filename) + ctx.stream = None + + # Disable compression + headers = HTTPHeaderDict({'Accept-Encoding': 'identity'}, info_dict.get('http_headers')) + + is_test = self.params.get('test', False) + chunk_size = self._TEST_FILE_SIZE if is_test else ( + self.params.get('http_chunk_size') + or info_dict.get('downloader_options', {}).get('http_chunk_size') + or 0) + + ctx.open_mode = 'wb' + ctx.resume_len = 0 + ctx.block_size = self.params.get('buffersize', 1024) + ctx.start_time = time.time() + + # parse given Range + req_start, req_end, _ = parse_http_range(headers.get('Range')) + + if self.params.get('continuedl', True): + # Establish possible resume length + if os.path.isfile(encodeFilename(ctx.tmpfilename)): + ctx.resume_len = os.path.getsize( + encodeFilename(ctx.tmpfilename)) + + ctx.is_resume = ctx.resume_len > 0 + + class SucceedDownload(Exception): + pass + + class RetryDownload(Exception): + def __init__(self, source_error): + self.source_error = source_error + + class NextFragment(Exception): + pass + + def establish_connection(): + ctx.chunk_size = (random.randint(int(chunk_size * 0.95), chunk_size) + if not is_test and chunk_size else chunk_size) + if ctx.resume_len > 0: + range_start = ctx.resume_len + if req_start is not None: + # offset the beginning of Range to be within request + range_start += req_start + if ctx.is_resume: + self.report_resuming_byte(ctx.resume_len) + ctx.open_mode = 'ab' + elif req_start is not None: + range_start = req_start + elif ctx.chunk_size > 0: + range_start = 0 + else: + range_start = None + ctx.is_resume = False + + if ctx.chunk_size: + chunk_aware_end = range_start + ctx.chunk_size - 1 + # we're not allowed to download outside Range + range_end = chunk_aware_end if req_end is None else min(chunk_aware_end, req_end) + elif req_end is not None: + # there's no need for chunked downloads, so download until the end of Range + range_end = req_end + else: + range_end = None + + if try_call(lambda: range_start > range_end): + ctx.resume_len = 0 + ctx.open_mode = 'wb' + raise RetryDownload(Exception(f'Conflicting range. (start={range_start} > end={range_end})')) + + if try_call(lambda: range_end >= ctx.content_len): + range_end = ctx.content_len - 1 + + request = Request(url, request_data, headers) + has_range = range_start is not None + if has_range: + request.headers['Range'] = f'bytes={int(range_start)}-{int_or_none(range_end) or ""}' + # Establish connection + try: + ctx.data = self.ydl.urlopen(request) + # When trying to resume, Content-Range HTTP header of response has to be checked + # to match the value of requested Range HTTP header. This is due to a webservers + # that don't support resuming and serve a whole file with no Content-Range + # set in response despite of requested Range (see + # https://github.com/ytdl-org/youtube-dl/issues/6057#issuecomment-126129799) + if has_range: + content_range = ctx.data.headers.get('Content-Range') + content_range_start, content_range_end, content_len = parse_http_range(content_range) + # Content-Range is present and matches requested Range, resume is possible + if range_start == content_range_start and ( + # Non-chunked download + not ctx.chunk_size + # Chunked download and requested piece or + # its part is promised to be served + or content_range_end == range_end + or content_len < range_end): + ctx.content_len = content_len + if content_len or req_end: + ctx.data_len = min(content_len or req_end, req_end or content_len) - (req_start or 0) + return + # Content-Range is either not present or invalid. Assuming remote webserver is + # trying to send the whole file, resume is not possible, so wiping the local file + # and performing entire redownload + elif range_start > 0: + self.report_unable_to_resume() + ctx.resume_len = 0 + ctx.open_mode = 'wb' + ctx.data_len = ctx.content_len = int_or_none(ctx.data.headers.get('Content-length', None)) + except HTTPError as err: + if err.status == 416: + # Unable to resume (requested range not satisfiable) + try: + # Open the connection again without the range header + ctx.data = self.ydl.urlopen( + Request(url, request_data, headers)) + content_length = ctx.data.headers['Content-Length'] + except HTTPError as err: + if err.status < 500 or err.status >= 600: + raise + else: + # Examine the reported length + if (content_length is not None + and (ctx.resume_len - 100 < int(content_length) < ctx.resume_len + 100)): + # The file had already been fully downloaded. + # Explanation to the above condition: in issue #175 it was revealed that + # YouTube sometimes adds or removes a few bytes from the end of the file, + # changing the file size slightly and causing problems for some users. So + # I decided to implement a suggested change and consider the file + # completely downloaded if the file size differs less than 100 bytes from + # the one in the hard drive. + self.report_file_already_downloaded(ctx.filename) + self.try_rename(ctx.tmpfilename, ctx.filename) + self._hook_progress({ + 'filename': ctx.filename, + 'status': 'finished', + 'downloaded_bytes': ctx.resume_len, + 'total_bytes': ctx.resume_len, + }, info_dict) + raise SucceedDownload() + else: + # The length does not match, we start the download over + self.report_unable_to_resume() + ctx.resume_len = 0 + ctx.open_mode = 'wb' + return + elif err.status < 500 or err.status >= 600: + # Unexpected HTTP error + raise + raise RetryDownload(err) + except CertificateVerifyError: + raise + except TransportError as err: + raise RetryDownload(err) + + def close_stream(): + if ctx.stream is not None: + if not ctx.tmpfilename == '-': + ctx.stream.close() + ctx.stream = None + + def download(): + data_len = ctx.data.headers.get('Content-length') + + if ctx.data.headers.get('Content-encoding'): + # Content-encoding is present, Content-length is not reliable anymore as we are + # doing auto decompression. (See: https://github.com/yt-dlp/yt-dlp/pull/6176) + data_len = None + + # Range HTTP header may be ignored/unsupported by a webserver + # (e.g. extractor/scivee.py, extractor/bambuser.py). + # However, for a test we still would like to download just a piece of a file. + # To achieve this we limit data_len to _TEST_FILE_SIZE and manually control + # block size when downloading a file. + if is_test and (data_len is None or int(data_len) > self._TEST_FILE_SIZE): + data_len = self._TEST_FILE_SIZE + + if data_len is not None: + data_len = int(data_len) + ctx.resume_len + min_data_len = self.params.get('min_filesize') + max_data_len = self.params.get('max_filesize') + if min_data_len is not None and data_len < min_data_len: + self.to_screen( + f'\r[download] File is smaller than min-filesize ({data_len} bytes < {min_data_len} bytes). Aborting.') + return False + if max_data_len is not None and data_len > max_data_len: + self.to_screen( + f'\r[download] File is larger than max-filesize ({data_len} bytes > {max_data_len} bytes). Aborting.') + return False + + byte_counter = 0 + ctx.resume_len + block_size = ctx.block_size + start = time.time() + + # measure time over whole while-loop, so slow_down() and best_block_size() work together properly + now = None # needed for slow_down() in the first loop run + before = start # start measuring + + def retry(e): + close_stream() + if ctx.tmpfilename == '-': + ctx.resume_len = byte_counter + else: + try: + ctx.resume_len = os.path.getsize(encodeFilename(ctx.tmpfilename)) + except FileNotFoundError: + ctx.resume_len = 0 + raise RetryDownload(e) + + while True: + try: + # Download and write + data_block = ctx.data.read(block_size if not is_test else min(block_size, data_len - byte_counter)) + except TransportError as err: + retry(err) + + byte_counter += len(data_block) + + # exit loop when download is finished + if len(data_block) == 0: + break + + # Open destination file just in time + if ctx.stream is None: + try: + ctx.stream, ctx.tmpfilename = self.sanitize_open( + ctx.tmpfilename, ctx.open_mode) + assert ctx.stream is not None + ctx.filename = self.undo_temp_name(ctx.tmpfilename) + self.report_destination(ctx.filename) + except OSError as err: + self.report_error('unable to open for writing: %s' % str(err)) + return False + + if self.params.get('xattr_set_filesize', False) and data_len is not None: + try: + write_xattr(ctx.tmpfilename, 'user.ytdl.filesize', str(data_len).encode()) + except (XAttrUnavailableError, XAttrMetadataError) as err: + self.report_error('unable to set filesize xattr: %s' % str(err)) + + try: + ctx.stream.write(data_block) + except OSError as err: + self.to_stderr('\n') + self.report_error('unable to write data: %s' % str(err)) + return False + + # Apply rate limit + self.slow_down(start, now, byte_counter - ctx.resume_len) + + # end measuring of one loop run + now = time.time() + after = now + + # Adjust block size + if not self.params.get('noresizebuffer', False): + block_size = self.best_block_size(after - before, len(data_block)) + + before = after + + # Progress message + speed = self.calc_speed(start, now, byte_counter - ctx.resume_len) + if ctx.data_len is None: + eta = None + else: + eta = self.calc_eta(start, time.time(), ctx.data_len - ctx.resume_len, byte_counter - ctx.resume_len) + + self._hook_progress({ + 'status': 'downloading', + 'downloaded_bytes': byte_counter, + 'total_bytes': ctx.data_len, + 'tmpfilename': ctx.tmpfilename, + 'filename': ctx.filename, + 'eta': eta, + 'speed': speed, + 'elapsed': now - ctx.start_time, + 'ctx_id': info_dict.get('ctx_id'), + }, info_dict) + + if data_len is not None and byte_counter == data_len: + break + + if speed and speed < (self.params.get('throttledratelimit') or 0): + # The speed must stay below the limit for 3 seconds + # This prevents raising error when the speed temporarily goes down + if ctx.throttle_start is None: + ctx.throttle_start = now + elif now - ctx.throttle_start > 3: + if ctx.stream is not None and ctx.tmpfilename != '-': + ctx.stream.close() + raise ThrottledDownload() + elif speed: + ctx.throttle_start = None + + if ctx.stream is None: + self.to_stderr('\n') + self.report_error('Did not get any data blocks') + return False + + if not is_test and ctx.chunk_size and ctx.content_len is not None and byte_counter < ctx.content_len: + ctx.resume_len = byte_counter + raise NextFragment() + + if ctx.tmpfilename != '-': + ctx.stream.close() + + if data_len is not None and byte_counter != data_len: + err = ContentTooShortError(byte_counter, int(data_len)) + retry(err) + + self.try_rename(ctx.tmpfilename, ctx.filename) + + # Update file modification time + if self.params.get('updatetime', True): + info_dict['filetime'] = self.try_utime(ctx.filename, ctx.data.headers.get('last-modified', None)) + + self._hook_progress({ + 'downloaded_bytes': byte_counter, + 'total_bytes': byte_counter, + 'filename': ctx.filename, + 'status': 'finished', + 'elapsed': time.time() - ctx.start_time, + 'ctx_id': info_dict.get('ctx_id'), + }, info_dict) + + return True + + for retry in RetryManager(self.params.get('retries'), self.report_retry): + try: + establish_connection() + return download() + except RetryDownload as err: + retry.error = err.source_error + continue + except NextFragment: + retry.error = None + retry.attempt -= 1 + continue + except SucceedDownload: + return True + except: # noqa: E722 + close_stream() + raise + return False diff --git a/yt_dlp/downloader/ism.py b/yt_dlp/downloader/ism.py new file mode 100644 index 0000000..dd688f5 --- /dev/null +++ b/yt_dlp/downloader/ism.py @@ -0,0 +1,283 @@ +import binascii +import io +import struct +import time + +from .fragment import FragmentFD +from ..networking.exceptions import HTTPError +from ..utils import RetryManager + +u8 = struct.Struct('>B') +u88 = struct.Struct('>Bx') +u16 = struct.Struct('>H') +u1616 = struct.Struct('>Hxx') +u32 = struct.Struct('>I') +u64 = struct.Struct('>Q') + +s88 = struct.Struct('>bx') +s16 = struct.Struct('>h') +s1616 = struct.Struct('>hxx') +s32 = struct.Struct('>i') + +unity_matrix = (s32.pack(0x10000) + s32.pack(0) * 3) * 2 + s32.pack(0x40000000) + +TRACK_ENABLED = 0x1 +TRACK_IN_MOVIE = 0x2 +TRACK_IN_PREVIEW = 0x4 + +SELF_CONTAINED = 0x1 + + +def box(box_type, payload): + return u32.pack(8 + len(payload)) + box_type + payload + + +def full_box(box_type, version, flags, payload): + return box(box_type, u8.pack(version) + u32.pack(flags)[1:] + payload) + + +def write_piff_header(stream, params): + track_id = params['track_id'] + fourcc = params['fourcc'] + duration = params['duration'] + timescale = params.get('timescale', 10000000) + language = params.get('language', 'und') + height = params.get('height', 0) + width = params.get('width', 0) + stream_type = params['stream_type'] + creation_time = modification_time = int(time.time()) + + ftyp_payload = b'isml' # major brand + ftyp_payload += u32.pack(1) # minor version + ftyp_payload += b'piff' + b'iso2' # compatible brands + stream.write(box(b'ftyp', ftyp_payload)) # File Type Box + + mvhd_payload = u64.pack(creation_time) + mvhd_payload += u64.pack(modification_time) + mvhd_payload += u32.pack(timescale) + mvhd_payload += u64.pack(duration) + mvhd_payload += s1616.pack(1) # rate + mvhd_payload += s88.pack(1) # volume + mvhd_payload += u16.pack(0) # reserved + mvhd_payload += u32.pack(0) * 2 # reserved + mvhd_payload += unity_matrix + mvhd_payload += u32.pack(0) * 6 # pre defined + mvhd_payload += u32.pack(0xffffffff) # next track id + moov_payload = full_box(b'mvhd', 1, 0, mvhd_payload) # Movie Header Box + + tkhd_payload = u64.pack(creation_time) + tkhd_payload += u64.pack(modification_time) + tkhd_payload += u32.pack(track_id) # track id + tkhd_payload += u32.pack(0) # reserved + tkhd_payload += u64.pack(duration) + tkhd_payload += u32.pack(0) * 2 # reserved + tkhd_payload += s16.pack(0) # layer + tkhd_payload += s16.pack(0) # alternate group + tkhd_payload += s88.pack(1 if stream_type == 'audio' else 0) # volume + tkhd_payload += u16.pack(0) # reserved + tkhd_payload += unity_matrix + tkhd_payload += u1616.pack(width) + tkhd_payload += u1616.pack(height) + trak_payload = full_box(b'tkhd', 1, TRACK_ENABLED | TRACK_IN_MOVIE | TRACK_IN_PREVIEW, tkhd_payload) # Track Header Box + + mdhd_payload = u64.pack(creation_time) + mdhd_payload += u64.pack(modification_time) + mdhd_payload += u32.pack(timescale) + mdhd_payload += u64.pack(duration) + mdhd_payload += u16.pack(((ord(language[0]) - 0x60) << 10) | ((ord(language[1]) - 0x60) << 5) | (ord(language[2]) - 0x60)) + mdhd_payload += u16.pack(0) # pre defined + mdia_payload = full_box(b'mdhd', 1, 0, mdhd_payload) # Media Header Box + + hdlr_payload = u32.pack(0) # pre defined + if stream_type == 'audio': # handler type + hdlr_payload += b'soun' + hdlr_payload += u32.pack(0) * 3 # reserved + hdlr_payload += b'SoundHandler\0' # name + elif stream_type == 'video': + hdlr_payload += b'vide' + hdlr_payload += u32.pack(0) * 3 # reserved + hdlr_payload += b'VideoHandler\0' # name + elif stream_type == 'text': + hdlr_payload += b'subt' + hdlr_payload += u32.pack(0) * 3 # reserved + hdlr_payload += b'SubtitleHandler\0' # name + else: + assert False + mdia_payload += full_box(b'hdlr', 0, 0, hdlr_payload) # Handler Reference Box + + if stream_type == 'audio': + smhd_payload = s88.pack(0) # balance + smhd_payload += u16.pack(0) # reserved + media_header_box = full_box(b'smhd', 0, 0, smhd_payload) # Sound Media Header + elif stream_type == 'video': + vmhd_payload = u16.pack(0) # graphics mode + vmhd_payload += u16.pack(0) * 3 # opcolor + media_header_box = full_box(b'vmhd', 0, 1, vmhd_payload) # Video Media Header + elif stream_type == 'text': + media_header_box = full_box(b'sthd', 0, 0, b'') # Subtitle Media Header + else: + assert False + minf_payload = media_header_box + + dref_payload = u32.pack(1) # entry count + dref_payload += full_box(b'url ', 0, SELF_CONTAINED, b'') # Data Entry URL Box + dinf_payload = full_box(b'dref', 0, 0, dref_payload) # Data Reference Box + minf_payload += box(b'dinf', dinf_payload) # Data Information Box + + stsd_payload = u32.pack(1) # entry count + + sample_entry_payload = u8.pack(0) * 6 # reserved + sample_entry_payload += u16.pack(1) # data reference index + if stream_type == 'audio': + sample_entry_payload += u32.pack(0) * 2 # reserved + sample_entry_payload += u16.pack(params.get('channels', 2)) + sample_entry_payload += u16.pack(params.get('bits_per_sample', 16)) + sample_entry_payload += u16.pack(0) # pre defined + sample_entry_payload += u16.pack(0) # reserved + sample_entry_payload += u1616.pack(params['sampling_rate']) + + if fourcc == 'AACL': + sample_entry_box = box(b'mp4a', sample_entry_payload) + if fourcc == 'EC-3': + sample_entry_box = box(b'ec-3', sample_entry_payload) + elif stream_type == 'video': + sample_entry_payload += u16.pack(0) # pre defined + sample_entry_payload += u16.pack(0) # reserved + sample_entry_payload += u32.pack(0) * 3 # pre defined + sample_entry_payload += u16.pack(width) + sample_entry_payload += u16.pack(height) + sample_entry_payload += u1616.pack(0x48) # horiz resolution 72 dpi + sample_entry_payload += u1616.pack(0x48) # vert resolution 72 dpi + sample_entry_payload += u32.pack(0) # reserved + sample_entry_payload += u16.pack(1) # frame count + sample_entry_payload += u8.pack(0) * 32 # compressor name + sample_entry_payload += u16.pack(0x18) # depth + sample_entry_payload += s16.pack(-1) # pre defined + + codec_private_data = binascii.unhexlify(params['codec_private_data'].encode()) + if fourcc in ('H264', 'AVC1'): + sps, pps = codec_private_data.split(u32.pack(1))[1:] + avcc_payload = u8.pack(1) # configuration version + avcc_payload += sps[1:4] # avc profile indication + profile compatibility + avc level indication + avcc_payload += u8.pack(0xfc | (params.get('nal_unit_length_field', 4) - 1)) # complete representation (1) + reserved (11111) + length size minus one + avcc_payload += u8.pack(1) # reserved (0) + number of sps (0000001) + avcc_payload += u16.pack(len(sps)) + avcc_payload += sps + avcc_payload += u8.pack(1) # number of pps + avcc_payload += u16.pack(len(pps)) + avcc_payload += pps + sample_entry_payload += box(b'avcC', avcc_payload) # AVC Decoder Configuration Record + sample_entry_box = box(b'avc1', sample_entry_payload) # AVC Simple Entry + else: + assert False + elif stream_type == 'text': + if fourcc == 'TTML': + sample_entry_payload += b'http://www.w3.org/ns/ttml\0' # namespace + sample_entry_payload += b'\0' # schema location + sample_entry_payload += b'\0' # auxilary mime types(??) + sample_entry_box = box(b'stpp', sample_entry_payload) + else: + assert False + else: + assert False + stsd_payload += sample_entry_box + + stbl_payload = full_box(b'stsd', 0, 0, stsd_payload) # Sample Description Box + + stts_payload = u32.pack(0) # entry count + stbl_payload += full_box(b'stts', 0, 0, stts_payload) # Decoding Time to Sample Box + + stsc_payload = u32.pack(0) # entry count + stbl_payload += full_box(b'stsc', 0, 0, stsc_payload) # Sample To Chunk Box + + stco_payload = u32.pack(0) # entry count + stbl_payload += full_box(b'stco', 0, 0, stco_payload) # Chunk Offset Box + + minf_payload += box(b'stbl', stbl_payload) # Sample Table Box + + mdia_payload += box(b'minf', minf_payload) # Media Information Box + + trak_payload += box(b'mdia', mdia_payload) # Media Box + + moov_payload += box(b'trak', trak_payload) # Track Box + + mehd_payload = u64.pack(duration) + mvex_payload = full_box(b'mehd', 1, 0, mehd_payload) # Movie Extends Header Box + + trex_payload = u32.pack(track_id) # track id + trex_payload += u32.pack(1) # default sample description index + trex_payload += u32.pack(0) # default sample duration + trex_payload += u32.pack(0) # default sample size + trex_payload += u32.pack(0) # default sample flags + mvex_payload += full_box(b'trex', 0, 0, trex_payload) # Track Extends Box + + moov_payload += box(b'mvex', mvex_payload) # Movie Extends Box + stream.write(box(b'moov', moov_payload)) # Movie Box + + +def extract_box_data(data, box_sequence): + data_reader = io.BytesIO(data) + while True: + box_size = u32.unpack(data_reader.read(4))[0] + box_type = data_reader.read(4) + if box_type == box_sequence[0]: + box_data = data_reader.read(box_size - 8) + if len(box_sequence) == 1: + return box_data + return extract_box_data(box_data, box_sequence[1:]) + data_reader.seek(box_size - 8, 1) + + +class IsmFD(FragmentFD): + """ + Download segments in a ISM manifest + """ + + def real_download(self, filename, info_dict): + segments = info_dict['fragments'][:1] if self.params.get( + 'test', False) else info_dict['fragments'] + + ctx = { + 'filename': filename, + 'total_frags': len(segments), + } + + self._prepare_and_start_frag_download(ctx, info_dict) + + extra_state = ctx.setdefault('extra_state', { + 'ism_track_written': False, + }) + + skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) + + frag_index = 0 + for i, segment in enumerate(segments): + frag_index += 1 + if frag_index <= ctx['fragment_index']: + continue + + retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry, + frag_index=frag_index, fatal=not skip_unavailable_fragments) + for retry in retry_manager: + try: + success = self._download_fragment(ctx, segment['url'], info_dict) + if not success: + return False + frag_content = self._read_fragment(ctx) + + if not extra_state['ism_track_written']: + tfhd_data = extract_box_data(frag_content, [b'moof', b'traf', b'tfhd']) + info_dict['_download_params']['track_id'] = u32.unpack(tfhd_data[4:8])[0] + write_piff_header(ctx['dest_stream'], info_dict['_download_params']) + extra_state['ism_track_written'] = True + self._append_fragment(ctx, frag_content) + except HTTPError as err: + retry.error = err + continue + + if retry_manager.error: + if not skip_unavailable_fragments: + return False + self.report_skip_fragment(frag_index) + + return self._finish_frag_download(ctx, info_dict) diff --git a/yt_dlp/downloader/mhtml.py b/yt_dlp/downloader/mhtml.py new file mode 100644 index 0000000..d977dce --- /dev/null +++ b/yt_dlp/downloader/mhtml.py @@ -0,0 +1,189 @@ +import io +import quopri +import re +import uuid + +from .fragment import FragmentFD +from ..compat import imghdr +from ..utils import escapeHTML, formatSeconds, srt_subtitles_timecode, urljoin +from ..version import __version__ as YT_DLP_VERSION + + +class MhtmlFD(FragmentFD): + _STYLESHEET = """\ +html, body { + margin: 0; + padding: 0; + height: 100vh; +} + +html { + overflow-y: scroll; + scroll-snap-type: y mandatory; +} + +body { + scroll-snap-type: y mandatory; + display: flex; + flex-flow: column; +} + +body > figure { + max-width: 100vw; + max-height: 100vh; + scroll-snap-align: center; +} + +body > figure > figcaption { + text-align: center; + height: 2.5em; +} + +body > figure > img { + display: block; + margin: auto; + max-width: 100%; + max-height: calc(100vh - 5em); +} +""" + _STYLESHEET = re.sub(r'\s+', ' ', _STYLESHEET) + _STYLESHEET = re.sub(r'\B \B|(?<=[\w\-]) (?=[^\w\-])|(?<=[^\w\-]) (?=[\w\-])', '', _STYLESHEET) + + @staticmethod + def _escape_mime(s): + return '=?utf-8?Q?' + (b''.join( + bytes((b,)) if b >= 0x20 else b'=%02X' % b + for b in quopri.encodestring(s.encode(), header=True) + )).decode('us-ascii') + '?=' + + def _gen_cid(self, i, fragment, frag_boundary): + return '%u.%s@yt-dlp.github.io.invalid' % (i, frag_boundary) + + def _gen_stub(self, *, fragments, frag_boundary, title): + output = io.StringIO() + + output.write(( + '<!DOCTYPE html>' + '<html>' + '<head>' + '' '<meta name="generator" content="yt-dlp {version}">' + '' '<title>{title}</title>' + '' '<style>{styles}</style>' + '<body>' + ).format( + version=escapeHTML(YT_DLP_VERSION), + styles=self._STYLESHEET, + title=escapeHTML(title) + )) + + t0 = 0 + for i, frag in enumerate(fragments): + output.write('<figure>') + try: + t1 = t0 + frag['duration'] + output.write(( + '<figcaption>Slide #{num}: {t0} – {t1} (duration: {duration})</figcaption>' + ).format( + num=i + 1, + t0=srt_subtitles_timecode(t0), + t1=srt_subtitles_timecode(t1), + duration=formatSeconds(frag['duration'], msec=True) + )) + except (KeyError, ValueError, TypeError): + t1 = None + output.write(( + '<figcaption>Slide #{num}</figcaption>' + ).format(num=i + 1)) + output.write('<img src="cid:{cid}">'.format( + cid=self._gen_cid(i, frag, frag_boundary))) + output.write('</figure>') + t0 = t1 + + return output.getvalue() + + def real_download(self, filename, info_dict): + fragment_base_url = info_dict.get('fragment_base_url') + fragments = info_dict['fragments'][:1] if self.params.get( + 'test', False) else info_dict['fragments'] + title = info_dict.get('title', info_dict['format_id']) + origin = info_dict.get('webpage_url', info_dict['url']) + + ctx = { + 'filename': filename, + 'total_frags': len(fragments), + } + + self._prepare_and_start_frag_download(ctx, info_dict) + + extra_state = ctx.setdefault('extra_state', { + 'header_written': False, + 'mime_boundary': str(uuid.uuid4()).replace('-', ''), + }) + + frag_boundary = extra_state['mime_boundary'] + + if not extra_state['header_written']: + stub = self._gen_stub( + fragments=fragments, + frag_boundary=frag_boundary, + title=title + ) + + ctx['dest_stream'].write(( + 'MIME-Version: 1.0\r\n' + 'From: <nowhere@yt-dlp.github.io.invalid>\r\n' + 'To: <nowhere@yt-dlp.github.io.invalid>\r\n' + 'Subject: {title}\r\n' + 'Content-type: multipart/related; ' + '' 'boundary="{boundary}"; ' + '' 'type="text/html"\r\n' + 'X.yt-dlp.Origin: {origin}\r\n' + '\r\n' + '--{boundary}\r\n' + 'Content-Type: text/html; charset=utf-8\r\n' + 'Content-Length: {length}\r\n' + '\r\n' + '{stub}\r\n' + ).format( + origin=origin, + boundary=frag_boundary, + length=len(stub), + title=self._escape_mime(title), + stub=stub + ).encode()) + extra_state['header_written'] = True + + for i, fragment in enumerate(fragments): + if (i + 1) <= ctx['fragment_index']: + continue + + fragment_url = fragment.get('url') + if not fragment_url: + assert fragment_base_url + fragment_url = urljoin(fragment_base_url, fragment['path']) + + success = self._download_fragment(ctx, fragment_url, info_dict) + if not success: + continue + frag_content = self._read_fragment(ctx) + + frag_header = io.BytesIO() + frag_header.write( + b'--%b\r\n' % frag_boundary.encode('us-ascii')) + frag_header.write( + b'Content-ID: <%b>\r\n' % self._gen_cid(i, fragment, frag_boundary).encode('us-ascii')) + frag_header.write( + b'Content-type: %b\r\n' % f'image/{imghdr.what(h=frag_content) or "jpeg"}'.encode()) + frag_header.write( + b'Content-length: %u\r\n' % len(frag_content)) + frag_header.write( + b'Content-location: %b\r\n' % fragment_url.encode('us-ascii')) + frag_header.write( + b'X.yt-dlp.Duration: %f\r\n' % fragment['duration']) + frag_header.write(b'\r\n') + self._append_fragment( + ctx, frag_header.getvalue() + frag_content + b'\r\n') + + ctx['dest_stream'].write( + b'--%b--\r\n\r\n' % frag_boundary.encode('us-ascii')) + return self._finish_frag_download(ctx, info_dict) diff --git a/yt_dlp/downloader/niconico.py b/yt_dlp/downloader/niconico.py new file mode 100644 index 0000000..fef8bff --- /dev/null +++ b/yt_dlp/downloader/niconico.py @@ -0,0 +1,140 @@ +import json +import threading +import time + +from . import get_suitable_downloader +from .common import FileDownloader +from .external import FFmpegFD +from ..networking import Request +from ..utils import DownloadError, str_or_none, try_get + + +class NiconicoDmcFD(FileDownloader): + """ Downloading niconico douga from DMC with heartbeat """ + + def real_download(self, filename, info_dict): + from ..extractor.niconico import NiconicoIE + + self.to_screen('[%s] Downloading from DMC' % self.FD_NAME) + ie = NiconicoIE(self.ydl) + info_dict, heartbeat_info_dict = ie._get_heartbeat_info(info_dict) + + fd = get_suitable_downloader(info_dict, params=self.params)(self.ydl, self.params) + + success = download_complete = False + timer = [None] + heartbeat_lock = threading.Lock() + heartbeat_url = heartbeat_info_dict['url'] + heartbeat_data = heartbeat_info_dict['data'].encode() + heartbeat_interval = heartbeat_info_dict.get('interval', 30) + + request = Request(heartbeat_url, heartbeat_data) + + def heartbeat(): + try: + self.ydl.urlopen(request).read() + except Exception: + self.to_screen('[%s] Heartbeat failed' % self.FD_NAME) + + with heartbeat_lock: + if not download_complete: + timer[0] = threading.Timer(heartbeat_interval, heartbeat) + timer[0].start() + + heartbeat_info_dict['ping']() + self.to_screen('[%s] Heartbeat with %d second interval ...' % (self.FD_NAME, heartbeat_interval)) + try: + heartbeat() + if type(fd).__name__ == 'HlsFD': + info_dict.update(ie._extract_m3u8_formats(info_dict['url'], info_dict['id'])[0]) + success = fd.real_download(filename, info_dict) + finally: + if heartbeat_lock: + with heartbeat_lock: + timer[0].cancel() + download_complete = True + return success + + +class NiconicoLiveFD(FileDownloader): + """ Downloads niconico live without being stopped """ + + def real_download(self, filename, info_dict): + video_id = info_dict['video_id'] + ws_url = info_dict['url'] + ws_extractor = info_dict['ws'] + ws_origin_host = info_dict['origin'] + live_quality = info_dict.get('live_quality', 'high') + live_latency = info_dict.get('live_latency', 'high') + dl = FFmpegFD(self.ydl, self.params or {}) + + new_info_dict = info_dict.copy() + new_info_dict.update({ + 'protocol': 'm3u8', + }) + + def communicate_ws(reconnect): + if reconnect: + ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'})) + if self.ydl.params.get('verbose', False): + self.to_screen('[debug] Sending startWatching request') + ws.send(json.dumps({ + 'type': 'startWatching', + 'data': { + 'stream': { + 'quality': live_quality, + 'protocol': 'hls+fmp4', + 'latency': live_latency, + 'chasePlay': False + }, + 'room': { + 'protocol': 'webSocket', + 'commentable': True + }, + 'reconnect': True, + } + })) + else: + ws = ws_extractor + with ws: + while True: + recv = ws.recv() + if not recv: + continue + data = json.loads(recv) + if not data or not isinstance(data, dict): + continue + if data.get('type') == 'ping': + # pong back + ws.send(r'{"type":"pong"}') + ws.send(r'{"type":"keepSeat"}') + elif data.get('type') == 'disconnect': + self.write_debug(data) + return True + elif data.get('type') == 'error': + self.write_debug(data) + message = try_get(data, lambda x: x['body']['code'], str) or recv + return DownloadError(message) + elif self.ydl.params.get('verbose', False): + if len(recv) > 100: + recv = recv[:100] + '...' + self.to_screen('[debug] Server said: %s' % recv) + + def ws_main(): + reconnect = False + while True: + try: + ret = communicate_ws(reconnect) + if ret is True: + return + except BaseException as e: + self.to_screen('[%s] %s: Connection error occured, reconnecting after 10 seconds: %s' % ('niconico:live', video_id, str_or_none(e))) + time.sleep(10) + continue + finally: + reconnect = True + + thread = threading.Thread(target=ws_main, daemon=True) + thread.start() + + return dl.download(filename, new_info_dict) diff --git a/yt_dlp/downloader/rtmp.py b/yt_dlp/downloader/rtmp.py new file mode 100644 index 0000000..0e09525 --- /dev/null +++ b/yt_dlp/downloader/rtmp.py @@ -0,0 +1,213 @@ +import os +import re +import subprocess +import time + +from .common import FileDownloader +from ..utils import ( + Popen, + check_executable, + encodeArgument, + encodeFilename, + get_exe_version, +) + + +def rtmpdump_version(): + return get_exe_version( + 'rtmpdump', ['--help'], r'(?i)RTMPDump\s*v?([0-9a-zA-Z._-]+)') + + +class RtmpFD(FileDownloader): + def real_download(self, filename, info_dict): + def run_rtmpdump(args): + start = time.time() + resume_percent = None + resume_downloaded_data_len = None + proc = Popen(args, stderr=subprocess.PIPE) + cursor_in_new_line = True + proc_stderr_closed = False + try: + while not proc_stderr_closed: + # read line from stderr + line = '' + while True: + char = proc.stderr.read(1) + if not char: + proc_stderr_closed = True + break + if char in [b'\r', b'\n']: + break + line += char.decode('ascii', 'replace') + if not line: + # proc_stderr_closed is True + continue + mobj = re.search(r'([0-9]+\.[0-9]{3}) kB / [0-9]+\.[0-9]{2} sec \(([0-9]{1,2}\.[0-9])%\)', line) + if mobj: + downloaded_data_len = int(float(mobj.group(1)) * 1024) + percent = float(mobj.group(2)) + if not resume_percent: + resume_percent = percent + resume_downloaded_data_len = downloaded_data_len + time_now = time.time() + eta = self.calc_eta(start, time_now, 100 - resume_percent, percent - resume_percent) + speed = self.calc_speed(start, time_now, downloaded_data_len - resume_downloaded_data_len) + data_len = None + if percent > 0: + data_len = int(downloaded_data_len * 100 / percent) + self._hook_progress({ + 'status': 'downloading', + 'downloaded_bytes': downloaded_data_len, + 'total_bytes_estimate': data_len, + 'tmpfilename': tmpfilename, + 'filename': filename, + 'eta': eta, + 'elapsed': time_now - start, + 'speed': speed, + }, info_dict) + cursor_in_new_line = False + else: + # no percent for live streams + mobj = re.search(r'([0-9]+\.[0-9]{3}) kB / [0-9]+\.[0-9]{2} sec', line) + if mobj: + downloaded_data_len = int(float(mobj.group(1)) * 1024) + time_now = time.time() + speed = self.calc_speed(start, time_now, downloaded_data_len) + self._hook_progress({ + 'downloaded_bytes': downloaded_data_len, + 'tmpfilename': tmpfilename, + 'filename': filename, + 'status': 'downloading', + 'elapsed': time_now - start, + 'speed': speed, + }, info_dict) + cursor_in_new_line = False + elif self.params.get('verbose', False): + if not cursor_in_new_line: + self.to_screen('') + cursor_in_new_line = True + self.to_screen('[rtmpdump] ' + line) + if not cursor_in_new_line: + self.to_screen('') + return proc.wait() + except BaseException: # Including KeyboardInterrupt + proc.kill(timeout=None) + raise + + url = info_dict['url'] + player_url = info_dict.get('player_url') + page_url = info_dict.get('page_url') + app = info_dict.get('app') + play_path = info_dict.get('play_path') + tc_url = info_dict.get('tc_url') + flash_version = info_dict.get('flash_version') + live = info_dict.get('rtmp_live', False) + conn = info_dict.get('rtmp_conn') + protocol = info_dict.get('rtmp_protocol') + real_time = info_dict.get('rtmp_real_time', False) + no_resume = info_dict.get('no_resume', False) + continue_dl = self.params.get('continuedl', True) + + self.report_destination(filename) + tmpfilename = self.temp_name(filename) + test = self.params.get('test', False) + + # Check for rtmpdump first + if not check_executable('rtmpdump', ['-h']): + self.report_error('RTMP download detected but "rtmpdump" could not be run. Please install') + return False + + # Download using rtmpdump. rtmpdump returns exit code 2 when + # the connection was interrupted and resuming appears to be + # possible. This is part of rtmpdump's normal usage, AFAIK. + basic_args = [ + 'rtmpdump', '--verbose', '-r', url, + '-o', tmpfilename] + if player_url is not None: + basic_args += ['--swfVfy', player_url] + if page_url is not None: + basic_args += ['--pageUrl', page_url] + if app is not None: + basic_args += ['--app', app] + if play_path is not None: + basic_args += ['--playpath', play_path] + if tc_url is not None: + basic_args += ['--tcUrl', tc_url] + if test: + basic_args += ['--stop', '1'] + if flash_version is not None: + basic_args += ['--flashVer', flash_version] + if live: + basic_args += ['--live'] + if isinstance(conn, list): + for entry in conn: + basic_args += ['--conn', entry] + elif isinstance(conn, str): + basic_args += ['--conn', conn] + if protocol is not None: + basic_args += ['--protocol', protocol] + if real_time: + basic_args += ['--realtime'] + + args = basic_args + if not no_resume and continue_dl and not live: + args += ['--resume'] + if not live and continue_dl: + args += ['--skip', '1'] + + args = [encodeArgument(a) for a in args] + + self._debug_cmd(args, exe='rtmpdump') + + RD_SUCCESS = 0 + RD_FAILED = 1 + RD_INCOMPLETE = 2 + RD_NO_CONNECT = 3 + + started = time.time() + + try: + retval = run_rtmpdump(args) + except KeyboardInterrupt: + if not info_dict.get('is_live'): + raise + retval = RD_SUCCESS + self.to_screen('\n[rtmpdump] Interrupted by user') + + if retval == RD_NO_CONNECT: + self.report_error('[rtmpdump] Could not connect to RTMP server.') + return False + + while retval in (RD_INCOMPLETE, RD_FAILED) and not test and not live: + prevsize = os.path.getsize(encodeFilename(tmpfilename)) + self.to_screen('[rtmpdump] Downloaded %s bytes' % prevsize) + time.sleep(5.0) # This seems to be needed + args = basic_args + ['--resume'] + if retval == RD_FAILED: + args += ['--skip', '1'] + args = [encodeArgument(a) for a in args] + retval = run_rtmpdump(args) + cursize = os.path.getsize(encodeFilename(tmpfilename)) + if prevsize == cursize and retval == RD_FAILED: + break + # Some rtmp streams seem abort after ~ 99.8%. Don't complain for those + if prevsize == cursize and retval == RD_INCOMPLETE and cursize > 1024: + self.to_screen('[rtmpdump] Could not download the whole video. This can happen for some advertisements.') + retval = RD_SUCCESS + break + if retval == RD_SUCCESS or (test and retval == RD_INCOMPLETE): + fsize = os.path.getsize(encodeFilename(tmpfilename)) + self.to_screen('[rtmpdump] Downloaded %s bytes' % fsize) + self.try_rename(tmpfilename, filename) + self._hook_progress({ + 'downloaded_bytes': fsize, + 'total_bytes': fsize, + 'filename': filename, + 'status': 'finished', + 'elapsed': time.time() - started, + }, info_dict) + return True + else: + self.to_stderr('\n') + self.report_error('rtmpdump exited with code %d' % retval) + return False diff --git a/yt_dlp/downloader/rtsp.py b/yt_dlp/downloader/rtsp.py new file mode 100644 index 0000000..e89269f --- /dev/null +++ b/yt_dlp/downloader/rtsp.py @@ -0,0 +1,42 @@ +import os +import subprocess + +from .common import FileDownloader +from ..utils import check_executable, encodeFilename + + +class RtspFD(FileDownloader): + def real_download(self, filename, info_dict): + url = info_dict['url'] + self.report_destination(filename) + tmpfilename = self.temp_name(filename) + + if check_executable('mplayer', ['-h']): + args = [ + 'mplayer', '-really-quiet', '-vo', 'null', '-vc', 'dummy', + '-dumpstream', '-dumpfile', tmpfilename, url] + elif check_executable('mpv', ['-h']): + args = [ + 'mpv', '-really-quiet', '--vo=null', '--stream-dump=' + tmpfilename, url] + else: + self.report_error('MMS or RTSP download detected but neither "mplayer" nor "mpv" could be run. Please install one') + return False + + self._debug_cmd(args) + + retval = subprocess.call(args) + if retval == 0: + fsize = os.path.getsize(encodeFilename(tmpfilename)) + self.to_screen(f'\r[{args[0]}] {fsize} bytes') + self.try_rename(tmpfilename, filename) + self._hook_progress({ + 'downloaded_bytes': fsize, + 'total_bytes': fsize, + 'filename': filename, + 'status': 'finished', + }, info_dict) + return True + else: + self.to_stderr('\n') + self.report_error('%s exited with code %d' % (args[0], retval)) + return False diff --git a/yt_dlp/downloader/websocket.py b/yt_dlp/downloader/websocket.py new file mode 100644 index 0000000..6837ff1 --- /dev/null +++ b/yt_dlp/downloader/websocket.py @@ -0,0 +1,53 @@ +import asyncio +import contextlib +import os +import signal +import threading + +from .common import FileDownloader +from .external import FFmpegFD +from ..dependencies import websockets + + +class FFmpegSinkFD(FileDownloader): + """ A sink to ffmpeg for downloading fragments in any form """ + + def real_download(self, filename, info_dict): + info_copy = info_dict.copy() + info_copy['url'] = '-' + + async def call_conn(proc, stdin): + try: + await self.real_connection(stdin, info_dict) + except OSError: + pass + finally: + with contextlib.suppress(OSError): + stdin.flush() + stdin.close() + os.kill(os.getpid(), signal.SIGINT) + + class FFmpegStdinFD(FFmpegFD): + @classmethod + def get_basename(cls): + return FFmpegFD.get_basename() + + def on_process_started(self, proc, stdin): + thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), )) + thread.start() + + return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy) + + async def real_connection(self, sink, info_dict): + """ Override this in subclasses """ + raise NotImplementedError('This method must be implemented by subclasses') + + +class WebSocketFragmentFD(FFmpegSinkFD): + async def real_connection(self, sink, info_dict): + async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws: + while True: + recv = await ws.recv() + if isinstance(recv, str): + recv = recv.encode('utf8') + sink.write(recv) diff --git a/yt_dlp/downloader/youtube_live_chat.py b/yt_dlp/downloader/youtube_live_chat.py new file mode 100644 index 0000000..c7a8637 --- /dev/null +++ b/yt_dlp/downloader/youtube_live_chat.py @@ -0,0 +1,228 @@ +import json +import time + +from .fragment import FragmentFD +from ..networking.exceptions import HTTPError +from ..utils import ( + RegexNotFoundError, + RetryManager, + dict_get, + int_or_none, + try_get, +) +from ..utils.networking import HTTPHeaderDict + + +class YoutubeLiveChatFD(FragmentFD): + """ Downloads YouTube live chats fragment by fragment """ + + def real_download(self, filename, info_dict): + video_id = info_dict['video_id'] + self.to_screen('[%s] Downloading live chat' % self.FD_NAME) + if not self.params.get('skip_download') and info_dict['protocol'] == 'youtube_live_chat': + self.report_warning('Live chat download runs until the livestream ends. ' + 'If you wish to download the video simultaneously, run a separate yt-dlp instance') + + test = self.params.get('test', False) + + ctx = { + 'filename': filename, + 'live': True, + 'total_frags': None, + } + + from ..extractor.youtube import YoutubeBaseInfoExtractor + + ie = YoutubeBaseInfoExtractor(self.ydl) + + start_time = int(time.time() * 1000) + + def dl_fragment(url, data=None, headers=None): + http_headers = HTTPHeaderDict(info_dict.get('http_headers'), headers) + return self._download_fragment(ctx, url, info_dict, http_headers, data) + + def parse_actions_replay(live_chat_continuation): + offset = continuation_id = click_tracking_params = None + processed_fragment = bytearray() + for action in live_chat_continuation.get('actions', []): + if 'replayChatItemAction' in action: + replay_chat_item_action = action['replayChatItemAction'] + offset = int(replay_chat_item_action['videoOffsetTimeMsec']) + processed_fragment.extend( + json.dumps(action, ensure_ascii=False).encode() + b'\n') + if offset is not None: + continuation = try_get( + live_chat_continuation, + lambda x: x['continuations'][0]['liveChatReplayContinuationData'], dict) + if continuation: + continuation_id = continuation.get('continuation') + click_tracking_params = continuation.get('clickTrackingParams') + self._append_fragment(ctx, processed_fragment) + return continuation_id, offset, click_tracking_params + + def try_refresh_replay_beginning(live_chat_continuation): + # choose the second option that contains the unfiltered live chat replay + refresh_continuation = try_get( + live_chat_continuation, + lambda x: x['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict) + if refresh_continuation: + # no data yet but required to call _append_fragment + self._append_fragment(ctx, b'') + refresh_continuation_id = refresh_continuation.get('continuation') + offset = 0 + click_tracking_params = refresh_continuation.get('trackingParams') + return refresh_continuation_id, offset, click_tracking_params + return parse_actions_replay(live_chat_continuation) + + live_offset = 0 + + def parse_actions_live(live_chat_continuation): + nonlocal live_offset + continuation_id = click_tracking_params = None + processed_fragment = bytearray() + for action in live_chat_continuation.get('actions', []): + timestamp = self.parse_live_timestamp(action) + if timestamp is not None: + live_offset = timestamp - start_time + # compatibility with replay format + pseudo_action = { + 'replayChatItemAction': {'actions': [action]}, + 'videoOffsetTimeMsec': str(live_offset), + 'isLive': True, + } + processed_fragment.extend( + json.dumps(pseudo_action, ensure_ascii=False).encode() + b'\n') + continuation_data_getters = [ + lambda x: x['continuations'][0]['invalidationContinuationData'], + lambda x: x['continuations'][0]['timedContinuationData'], + ] + continuation_data = try_get(live_chat_continuation, continuation_data_getters, dict) + if continuation_data: + continuation_id = continuation_data.get('continuation') + click_tracking_params = continuation_data.get('clickTrackingParams') + timeout_ms = int_or_none(continuation_data.get('timeoutMs')) + if timeout_ms is not None: + time.sleep(timeout_ms / 1000) + self._append_fragment(ctx, processed_fragment) + return continuation_id, live_offset, click_tracking_params + + def download_and_parse_fragment(url, frag_index, request_data=None, headers=None): + for retry in RetryManager(self.params.get('fragment_retries'), self.report_retry, frag_index=frag_index): + try: + success = dl_fragment(url, request_data, headers) + if not success: + return False, None, None, None + raw_fragment = self._read_fragment(ctx) + try: + data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace')) + except RegexNotFoundError: + data = None + if not data: + data = json.loads(raw_fragment) + live_chat_continuation = try_get( + data, + lambda x: x['continuationContents']['liveChatContinuation'], dict) or {} + + func = (info_dict['protocol'] == 'youtube_live_chat' and parse_actions_live + or frag_index == 1 and try_refresh_replay_beginning + or parse_actions_replay) + return (True, *func(live_chat_continuation)) + except HTTPError as err: + retry.error = err + continue + return False, None, None, None + + self._prepare_and_start_frag_download(ctx, info_dict) + + success = dl_fragment(info_dict['url']) + if not success: + return False + raw_fragment = self._read_fragment(ctx) + try: + data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace')) + except RegexNotFoundError: + return False + continuation_id = try_get( + data, + lambda x: x['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation']) + # no data yet but required to call _append_fragment + self._append_fragment(ctx, b'') + + ytcfg = ie.extract_ytcfg(video_id, raw_fragment.decode('utf-8', 'replace')) + + if not ytcfg: + return False + api_key = try_get(ytcfg, lambda x: x['INNERTUBE_API_KEY']) + innertube_context = try_get(ytcfg, lambda x: x['INNERTUBE_CONTEXT']) + if not api_key or not innertube_context: + return False + visitor_data = try_get(innertube_context, lambda x: x['client']['visitorData'], str) + if info_dict['protocol'] == 'youtube_live_chat_replay': + url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key + chat_page_url = 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id + elif info_dict['protocol'] == 'youtube_live_chat': + url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key + chat_page_url = 'https://www.youtube.com/live_chat?continuation=' + continuation_id + + frag_index = offset = 0 + click_tracking_params = None + while continuation_id is not None: + frag_index += 1 + request_data = { + 'context': innertube_context, + 'continuation': continuation_id, + } + if frag_index > 1: + request_data['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))} + if click_tracking_params: + request_data['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params} + headers = ie.generate_api_headers(ytcfg=ytcfg, visitor_data=visitor_data) + headers.update({'content-type': 'application/json'}) + fragment_request_data = json.dumps(request_data, ensure_ascii=False).encode() + b'\n' + success, continuation_id, offset, click_tracking_params = download_and_parse_fragment( + url, frag_index, fragment_request_data, headers) + else: + success, continuation_id, offset, click_tracking_params = download_and_parse_fragment( + chat_page_url, frag_index) + if not success: + return False + if test: + break + + return self._finish_frag_download(ctx, info_dict) + + @staticmethod + def parse_live_timestamp(action): + action_content = dict_get( + action, + ['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand']) + if not isinstance(action_content, dict): + return None + item = dict_get(action_content, ['item', 'bannerRenderer']) + if not isinstance(item, dict): + return None + renderer = dict_get(item, [ + # text + 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer', + 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer', + # ticker + 'liveChatTickerPaidMessageItemRenderer', + 'liveChatTickerSponsorItemRenderer', + # banner + 'liveChatBannerRenderer', + ]) + if not isinstance(renderer, dict): + return None + parent_item_getters = [ + lambda x: x['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'], + lambda x: x['contents'], + ] + parent_item = try_get(renderer, parent_item_getters, dict) + if parent_item: + renderer = dict_get(parent_item, [ + 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer', + 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer', + ]) + if not isinstance(renderer, dict): + return None + return int_or_none(renderer.get('timestampUsec'), 1000) |