summaryrefslogtreecommitdiffstats
path: root/yt_dlp/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/downloader')
-rw-r--r--yt_dlp/downloader/__init__.py131
-rw-r--r--yt_dlp/downloader/common.py486
-rw-r--r--yt_dlp/downloader/dash.py90
-rw-r--r--yt_dlp/downloader/external.py664
-rw-r--r--yt_dlp/downloader/f4m.py427
-rw-r--r--yt_dlp/downloader/fc2.py46
-rw-r--r--yt_dlp/downloader/fragment.py527
-rw-r--r--yt_dlp/downloader/hls.py378
-rw-r--r--yt_dlp/downloader/http.py383
-rw-r--r--yt_dlp/downloader/ism.py283
-rw-r--r--yt_dlp/downloader/mhtml.py189
-rw-r--r--yt_dlp/downloader/niconico.py140
-rw-r--r--yt_dlp/downloader/rtmp.py213
-rw-r--r--yt_dlp/downloader/rtsp.py42
-rw-r--r--yt_dlp/downloader/websocket.py53
-rw-r--r--yt_dlp/downloader/youtube_live_chat.py228
16 files changed, 4280 insertions, 0 deletions
diff --git a/yt_dlp/downloader/__init__.py b/yt_dlp/downloader/__init__.py
new file mode 100644
index 0000000..51a9f28
--- /dev/null
+++ b/yt_dlp/downloader/__init__.py
@@ -0,0 +1,131 @@
+from ..utils import NO_DEFAULT, determine_protocol
+
+
+def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=None, to_stdout=False):
+ info_dict['protocol'] = determine_protocol(info_dict)
+ info_copy = info_dict.copy()
+ info_copy['to_stdout'] = to_stdout
+
+ protocols = (protocol or info_copy['protocol']).split('+')
+ downloaders = [_get_suitable_downloader(info_copy, proto, params, default) for proto in protocols]
+
+ if set(downloaders) == {FFmpegFD} and FFmpegFD.can_merge_formats(info_copy, params):
+ return FFmpegFD
+ elif (set(downloaders) == {DashSegmentsFD}
+ and not (to_stdout and len(protocols) > 1)
+ and set(protocols) == {'http_dash_segments_generator'}):
+ return DashSegmentsFD
+ elif len(downloaders) == 1:
+ return downloaders[0]
+ return None
+
+
+# Some of these require get_suitable_downloader
+from .common import FileDownloader
+from .dash import DashSegmentsFD
+from .external import FFmpegFD, get_external_downloader
+from .f4m import F4mFD
+from .fc2 import FC2LiveFD
+from .hls import HlsFD
+from .http import HttpFD
+from .ism import IsmFD
+from .mhtml import MhtmlFD
+from .niconico import NiconicoDmcFD, NiconicoLiveFD
+from .rtmp import RtmpFD
+from .rtsp import RtspFD
+from .websocket import WebSocketFragmentFD
+from .youtube_live_chat import YoutubeLiveChatFD
+
+PROTOCOL_MAP = {
+ 'rtmp': RtmpFD,
+ 'rtmpe': RtmpFD,
+ 'rtmp_ffmpeg': FFmpegFD,
+ 'm3u8_native': HlsFD,
+ 'm3u8': FFmpegFD,
+ 'mms': RtspFD,
+ 'rtsp': RtspFD,
+ 'f4m': F4mFD,
+ 'http_dash_segments': DashSegmentsFD,
+ 'http_dash_segments_generator': DashSegmentsFD,
+ 'ism': IsmFD,
+ 'mhtml': MhtmlFD,
+ 'niconico_dmc': NiconicoDmcFD,
+ 'niconico_live': NiconicoLiveFD,
+ 'fc2_live': FC2LiveFD,
+ 'websocket_frag': WebSocketFragmentFD,
+ 'youtube_live_chat': YoutubeLiveChatFD,
+ 'youtube_live_chat_replay': YoutubeLiveChatFD,
+}
+
+
+def shorten_protocol_name(proto, simplify=False):
+ short_protocol_names = {
+ 'm3u8_native': 'm3u8',
+ 'm3u8': 'm3u8F',
+ 'rtmp_ffmpeg': 'rtmpF',
+ 'http_dash_segments': 'dash',
+ 'http_dash_segments_generator': 'dashG',
+ 'niconico_dmc': 'dmc',
+ 'websocket_frag': 'WSfrag',
+ }
+ if simplify:
+ short_protocol_names.update({
+ 'https': 'http',
+ 'ftps': 'ftp',
+ 'm3u8': 'm3u8', # Reverse above m3u8 mapping
+ 'm3u8_native': 'm3u8',
+ 'http_dash_segments_generator': 'dash',
+ 'rtmp_ffmpeg': 'rtmp',
+ 'm3u8_frag_urls': 'm3u8',
+ 'dash_frag_urls': 'dash',
+ })
+ return short_protocol_names.get(proto, proto)
+
+
+def _get_suitable_downloader(info_dict, protocol, params, default):
+ """Get the downloader class that can handle the info dict."""
+ if default is NO_DEFAULT:
+ default = HttpFD
+
+ if (info_dict.get('section_start') or info_dict.get('section_end')) and FFmpegFD.can_download(info_dict):
+ return FFmpegFD
+
+ info_dict['protocol'] = protocol
+ downloaders = params.get('external_downloader')
+ external_downloader = (
+ downloaders if isinstance(downloaders, str) or downloaders is None
+ else downloaders.get(shorten_protocol_name(protocol, True), downloaders.get('default')))
+
+ if external_downloader is None:
+ if info_dict['to_stdout'] and FFmpegFD.can_merge_formats(info_dict, params):
+ return FFmpegFD
+ elif external_downloader.lower() != 'native':
+ ed = get_external_downloader(external_downloader)
+ if ed.can_download(info_dict, external_downloader):
+ return ed
+
+ if protocol == 'http_dash_segments':
+ if info_dict.get('is_live') and (external_downloader or '').lower() != 'native':
+ return FFmpegFD
+
+ if protocol in ('m3u8', 'm3u8_native'):
+ if info_dict.get('is_live'):
+ return FFmpegFD
+ elif (external_downloader or '').lower() == 'native':
+ return HlsFD
+ elif protocol == 'm3u8_native' and get_suitable_downloader(
+ info_dict, params, None, protocol='m3u8_frag_urls', to_stdout=info_dict['to_stdout']):
+ return HlsFD
+ elif params.get('hls_prefer_native') is True:
+ return HlsFD
+ elif params.get('hls_prefer_native') is False:
+ return FFmpegFD
+
+ return PROTOCOL_MAP.get(protocol, default)
+
+
+__all__ = [
+ 'FileDownloader',
+ 'get_suitable_downloader',
+ 'shorten_protocol_name',
+]
diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py
new file mode 100644
index 0000000..b71d7ee
--- /dev/null
+++ b/yt_dlp/downloader/common.py
@@ -0,0 +1,486 @@
+import contextlib
+import errno
+import functools
+import os
+import random
+import re
+import time
+
+from ..minicurses import (
+ BreaklineStatusPrinter,
+ MultilineLogger,
+ MultilinePrinter,
+ QuietMultilinePrinter,
+)
+from ..utils import (
+ IDENTITY,
+ NO_DEFAULT,
+ LockingUnsupportedError,
+ Namespace,
+ RetryManager,
+ classproperty,
+ decodeArgument,
+ deprecation_warning,
+ encodeFilename,
+ format_bytes,
+ join_nonempty,
+ parse_bytes,
+ remove_start,
+ sanitize_open,
+ shell_quote,
+ timeconvert,
+ timetuple_from_msec,
+ try_call,
+)
+
+
+class FileDownloader:
+ """File Downloader class.
+
+ File downloader objects are the ones responsible of downloading the
+ actual video file and writing it to disk.
+
+ File downloaders accept a lot of parameters. In order not to saturate
+ the object constructor with arguments, it receives a dictionary of
+ options instead.
+
+ Available options:
+
+ verbose: Print additional info to stdout.
+ quiet: Do not print messages to stdout.
+ ratelimit: Download speed limit, in bytes/sec.
+ throttledratelimit: Assume the download is being throttled below this speed (bytes/sec)
+ retries: Number of times to retry for expected network errors.
+ Default is 0 for API, but 10 for CLI
+ file_access_retries: Number of times to retry on file access error (default: 3)
+ buffersize: Size of download buffer in bytes.
+ noresizebuffer: Do not automatically resize the download buffer.
+ continuedl: Try to continue downloads if possible.
+ noprogress: Do not print the progress bar.
+ nopart: Do not use temporary .part files.
+ updatetime: Use the Last-modified header to set output file timestamps.
+ test: Download only first bytes to test the downloader.
+ min_filesize: Skip files smaller than this size
+ max_filesize: Skip files larger than this size
+ xattr_set_filesize: Set ytdl.filesize user xattribute with expected size.
+ external_downloader_args: A dictionary of downloader keys (in lower case)
+ and a list of additional command-line arguments for the
+ executable. Use 'default' as the name for arguments to be
+ passed to all downloaders. For compatibility with youtube-dl,
+ a single list of args can also be used
+ hls_use_mpegts: Use the mpegts container for HLS videos.
+ http_chunk_size: Size of a chunk for chunk-based HTTP downloading. May be
+ useful for bypassing bandwidth throttling imposed by
+ a webserver (experimental)
+ progress_template: See YoutubeDL.py
+ retry_sleep_functions: See YoutubeDL.py
+
+ Subclasses of this one must re-define the real_download method.
+ """
+
+ _TEST_FILE_SIZE = 10241
+ params = None
+
+ def __init__(self, ydl, params):
+ """Create a FileDownloader object with the given options."""
+ self._set_ydl(ydl)
+ self._progress_hooks = []
+ self.params = params
+ self._prepare_multiline_status()
+ self.add_progress_hook(self.report_progress)
+
+ def _set_ydl(self, ydl):
+ self.ydl = ydl
+
+ for func in (
+ 'deprecation_warning',
+ 'deprecated_feature',
+ 'report_error',
+ 'report_file_already_downloaded',
+ 'report_warning',
+ 'to_console_title',
+ 'to_stderr',
+ 'trouble',
+ 'write_debug',
+ ):
+ if not hasattr(self, func):
+ setattr(self, func, getattr(ydl, func))
+
+ def to_screen(self, *args, **kargs):
+ self.ydl.to_screen(*args, quiet=self.params.get('quiet'), **kargs)
+
+ __to_screen = to_screen
+
+ @classproperty
+ def FD_NAME(cls):
+ return re.sub(r'(?<=[a-z])(?=[A-Z])', '_', cls.__name__[:-2]).lower()
+
+ @staticmethod
+ def format_seconds(seconds):
+ if seconds is None:
+ return ' Unknown'
+ time = timetuple_from_msec(seconds * 1000)
+ if time.hours > 99:
+ return '--:--:--'
+ return '%02d:%02d:%02d' % time[:-1]
+
+ @classmethod
+ def format_eta(cls, seconds):
+ return f'{remove_start(cls.format_seconds(seconds), "00:"):>8s}'
+
+ @staticmethod
+ def calc_percent(byte_counter, data_len):
+ if data_len is None:
+ return None
+ return float(byte_counter) / float(data_len) * 100.0
+
+ @staticmethod
+ def format_percent(percent):
+ return ' N/A%' if percent is None else f'{percent:>5.1f}%'
+
+ @classmethod
+ def calc_eta(cls, start_or_rate, now_or_remaining, total=NO_DEFAULT, current=NO_DEFAULT):
+ if total is NO_DEFAULT:
+ rate, remaining = start_or_rate, now_or_remaining
+ if None in (rate, remaining):
+ return None
+ return int(float(remaining) / rate)
+
+ start, now = start_or_rate, now_or_remaining
+ if total is None:
+ return None
+ if now is None:
+ now = time.time()
+ rate = cls.calc_speed(start, now, current)
+ return rate and int((float(total) - float(current)) / rate)
+
+ @staticmethod
+ def calc_speed(start, now, bytes):
+ dif = now - start
+ if bytes == 0 or dif < 0.001: # One millisecond
+ return None
+ return float(bytes) / dif
+
+ @staticmethod
+ def format_speed(speed):
+ return ' Unknown B/s' if speed is None else f'{format_bytes(speed):>10s}/s'
+
+ @staticmethod
+ def format_retries(retries):
+ return 'inf' if retries == float('inf') else int(retries)
+
+ @staticmethod
+ def filesize_or_none(unencoded_filename):
+ if os.path.isfile(unencoded_filename):
+ return os.path.getsize(unencoded_filename)
+ return 0
+
+ @staticmethod
+ def best_block_size(elapsed_time, bytes):
+ new_min = max(bytes / 2.0, 1.0)
+ new_max = min(max(bytes * 2.0, 1.0), 4194304) # Do not surpass 4 MB
+ if elapsed_time < 0.001:
+ return int(new_max)
+ rate = bytes / elapsed_time
+ if rate > new_max:
+ return int(new_max)
+ if rate < new_min:
+ return int(new_min)
+ return int(rate)
+
+ @staticmethod
+ def parse_bytes(bytestr):
+ """Parse a string indicating a byte quantity into an integer."""
+ deprecation_warning('yt_dlp.FileDownloader.parse_bytes is deprecated and '
+ 'may be removed in the future. Use yt_dlp.utils.parse_bytes instead')
+ return parse_bytes(bytestr)
+
+ def slow_down(self, start_time, now, byte_counter):
+ """Sleep if the download speed is over the rate limit."""
+ rate_limit = self.params.get('ratelimit')
+ if rate_limit is None or byte_counter == 0:
+ return
+ if now is None:
+ now = time.time()
+ elapsed = now - start_time
+ if elapsed <= 0.0:
+ return
+ speed = float(byte_counter) / elapsed
+ if speed > rate_limit:
+ sleep_time = float(byte_counter) / rate_limit - elapsed
+ if sleep_time > 0:
+ time.sleep(sleep_time)
+
+ def temp_name(self, filename):
+ """Returns a temporary filename for the given filename."""
+ if self.params.get('nopart', False) or filename == '-' or \
+ (os.path.exists(encodeFilename(filename)) and not os.path.isfile(encodeFilename(filename))):
+ return filename
+ return filename + '.part'
+
+ def undo_temp_name(self, filename):
+ if filename.endswith('.part'):
+ return filename[:-len('.part')]
+ return filename
+
+ def ytdl_filename(self, filename):
+ return filename + '.ytdl'
+
+ def wrap_file_access(action, *, fatal=False):
+ def error_callback(err, count, retries, *, fd):
+ return RetryManager.report_retry(
+ err, count, retries, info=fd.__to_screen,
+ warn=lambda e: (time.sleep(0.01), fd.to_screen(f'[download] Unable to {action} file: {e}')),
+ error=None if fatal else lambda e: fd.report_error(f'Unable to {action} file: {e}'),
+ sleep_func=fd.params.get('retry_sleep_functions', {}).get('file_access'))
+
+ def wrapper(self, func, *args, **kwargs):
+ for retry in RetryManager(self.params.get('file_access_retries', 3), error_callback, fd=self):
+ try:
+ return func(self, *args, **kwargs)
+ except OSError as err:
+ if err.errno in (errno.EACCES, errno.EINVAL):
+ retry.error = err
+ continue
+ retry.error_callback(err, 1, 0)
+
+ return functools.partial(functools.partialmethod, wrapper)
+
+ @wrap_file_access('open', fatal=True)
+ def sanitize_open(self, filename, open_mode):
+ f, filename = sanitize_open(filename, open_mode)
+ if not getattr(f, 'locked', None):
+ self.write_debug(f'{LockingUnsupportedError.msg}. Proceeding without locking', only_once=True)
+ return f, filename
+
+ @wrap_file_access('remove')
+ def try_remove(self, filename):
+ if os.path.isfile(filename):
+ os.remove(filename)
+
+ @wrap_file_access('rename')
+ def try_rename(self, old_filename, new_filename):
+ if old_filename == new_filename:
+ return
+ os.replace(old_filename, new_filename)
+
+ def try_utime(self, filename, last_modified_hdr):
+ """Try to set the last-modified time of the given file."""
+ if last_modified_hdr is None:
+ return
+ if not os.path.isfile(encodeFilename(filename)):
+ return
+ timestr = last_modified_hdr
+ if timestr is None:
+ return
+ filetime = timeconvert(timestr)
+ if filetime is None:
+ return filetime
+ # Ignore obviously invalid dates
+ if filetime == 0:
+ return
+ with contextlib.suppress(Exception):
+ os.utime(filename, (time.time(), filetime))
+ return filetime
+
+ def report_destination(self, filename):
+ """Report destination filename."""
+ self.to_screen('[download] Destination: ' + filename)
+
+ def _prepare_multiline_status(self, lines=1):
+ if self.params.get('noprogress'):
+ self._multiline = QuietMultilinePrinter()
+ elif self.ydl.params.get('logger'):
+ self._multiline = MultilineLogger(self.ydl.params['logger'], lines)
+ elif self.params.get('progress_with_newline'):
+ self._multiline = BreaklineStatusPrinter(self.ydl._out_files.out, lines)
+ else:
+ self._multiline = MultilinePrinter(self.ydl._out_files.out, lines, not self.params.get('quiet'))
+ self._multiline.allow_colors = self.ydl._allow_colors.out and self.ydl._allow_colors.out != 'no_color'
+ self._multiline._HAVE_FULLCAP = self.ydl._allow_colors.out
+
+ def _finish_multiline_status(self):
+ self._multiline.end()
+
+ ProgressStyles = Namespace(
+ downloaded_bytes='light blue',
+ percent='light blue',
+ eta='yellow',
+ speed='green',
+ elapsed='bold white',
+ total_bytes='',
+ total_bytes_estimate='',
+ )
+
+ def _report_progress_status(self, s, default_template):
+ for name, style in self.ProgressStyles.items_:
+ name = f'_{name}_str'
+ if name not in s:
+ continue
+ s[name] = self._format_progress(s[name], style)
+ s['_default_template'] = default_template % s
+
+ progress_dict = s.copy()
+ progress_dict.pop('info_dict')
+ progress_dict = {'info': s['info_dict'], 'progress': progress_dict}
+
+ progress_template = self.params.get('progress_template', {})
+ self._multiline.print_at_line(self.ydl.evaluate_outtmpl(
+ progress_template.get('download') or '[download] %(progress._default_template)s',
+ progress_dict), s.get('progress_idx') or 0)
+ self.to_console_title(self.ydl.evaluate_outtmpl(
+ progress_template.get('download-title') or 'yt-dlp %(progress._default_template)s',
+ progress_dict))
+
+ def _format_progress(self, *args, **kwargs):
+ return self.ydl._format_text(
+ self._multiline.stream, self._multiline.allow_colors, *args, **kwargs)
+
+ def report_progress(self, s):
+ def with_fields(*tups, default=''):
+ for *fields, tmpl in tups:
+ if all(s.get(f) is not None for f in fields):
+ return tmpl
+ return default
+
+ _format_bytes = lambda k: f'{format_bytes(s.get(k)):>10s}'
+
+ if s['status'] == 'finished':
+ if self.params.get('noprogress'):
+ self.to_screen('[download] Download completed')
+ speed = try_call(lambda: s['total_bytes'] / s['elapsed'])
+ s.update({
+ 'speed': speed,
+ '_speed_str': self.format_speed(speed).strip(),
+ '_total_bytes_str': _format_bytes('total_bytes'),
+ '_elapsed_str': self.format_seconds(s.get('elapsed')),
+ '_percent_str': self.format_percent(100),
+ })
+ self._report_progress_status(s, join_nonempty(
+ '100%%',
+ with_fields(('total_bytes', 'of %(_total_bytes_str)s')),
+ with_fields(('elapsed', 'in %(_elapsed_str)s')),
+ with_fields(('speed', 'at %(_speed_str)s')),
+ delim=' '))
+
+ if s['status'] != 'downloading':
+ return
+
+ s.update({
+ '_eta_str': self.format_eta(s.get('eta')).strip(),
+ '_speed_str': self.format_speed(s.get('speed')),
+ '_percent_str': self.format_percent(try_call(
+ lambda: 100 * s['downloaded_bytes'] / s['total_bytes'],
+ lambda: 100 * s['downloaded_bytes'] / s['total_bytes_estimate'],
+ lambda: s['downloaded_bytes'] == 0 and 0)),
+ '_total_bytes_str': _format_bytes('total_bytes'),
+ '_total_bytes_estimate_str': _format_bytes('total_bytes_estimate'),
+ '_downloaded_bytes_str': _format_bytes('downloaded_bytes'),
+ '_elapsed_str': self.format_seconds(s.get('elapsed')),
+ })
+
+ msg_template = with_fields(
+ ('total_bytes', '%(_percent_str)s of %(_total_bytes_str)s at %(_speed_str)s ETA %(_eta_str)s'),
+ ('total_bytes_estimate', '%(_percent_str)s of ~%(_total_bytes_estimate_str)s at %(_speed_str)s ETA %(_eta_str)s'),
+ ('downloaded_bytes', 'elapsed', '%(_downloaded_bytes_str)s at %(_speed_str)s (%(_elapsed_str)s)'),
+ ('downloaded_bytes', '%(_downloaded_bytes_str)s at %(_speed_str)s'),
+ default='%(_percent_str)s at %(_speed_str)s ETA %(_eta_str)s')
+
+ msg_template += with_fields(
+ ('fragment_index', 'fragment_count', ' (frag %(fragment_index)s/%(fragment_count)s)'),
+ ('fragment_index', ' (frag %(fragment_index)s)'))
+ self._report_progress_status(s, msg_template)
+
+ def report_resuming_byte(self, resume_len):
+ """Report attempt to resume at given byte."""
+ self.to_screen('[download] Resuming download at byte %s' % resume_len)
+
+ def report_retry(self, err, count, retries, frag_index=NO_DEFAULT, fatal=True):
+ """Report retry"""
+ is_frag = False if frag_index is NO_DEFAULT else 'fragment'
+ RetryManager.report_retry(
+ err, count, retries, info=self.__to_screen,
+ warn=lambda msg: self.__to_screen(f'[download] Got error: {msg}'),
+ error=IDENTITY if not fatal else lambda e: self.report_error(f'\r[download] Got error: {e}'),
+ sleep_func=self.params.get('retry_sleep_functions', {}).get(is_frag or 'http'),
+ suffix=f'fragment{"s" if frag_index is None else f" {frag_index}"}' if is_frag else None)
+
+ def report_unable_to_resume(self):
+ """Report it was impossible to resume download."""
+ self.to_screen('[download] Unable to resume')
+
+ @staticmethod
+ def supports_manifest(manifest):
+ """ Whether the downloader can download the fragments from the manifest.
+ Redefine in subclasses if needed. """
+ pass
+
+ def download(self, filename, info_dict, subtitle=False):
+ """Download to a filename using the info from info_dict
+ Return True on success and False otherwise
+ """
+ nooverwrites_and_exists = (
+ not self.params.get('overwrites', True)
+ and os.path.exists(encodeFilename(filename))
+ )
+
+ if not hasattr(filename, 'write'):
+ continuedl_and_exists = (
+ self.params.get('continuedl', True)
+ and os.path.isfile(encodeFilename(filename))
+ and not self.params.get('nopart', False)
+ )
+
+ # Check file already present
+ if filename != '-' and (nooverwrites_and_exists or continuedl_and_exists):
+ self.report_file_already_downloaded(filename)
+ self._hook_progress({
+ 'filename': filename,
+ 'status': 'finished',
+ 'total_bytes': os.path.getsize(encodeFilename(filename)),
+ }, info_dict)
+ self._finish_multiline_status()
+ return True, False
+
+ if subtitle:
+ sleep_interval = self.params.get('sleep_interval_subtitles') or 0
+ else:
+ min_sleep_interval = self.params.get('sleep_interval') or 0
+ sleep_interval = random.uniform(
+ min_sleep_interval, self.params.get('max_sleep_interval') or min_sleep_interval)
+ if sleep_interval > 0:
+ self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
+ time.sleep(sleep_interval)
+
+ ret = self.real_download(filename, info_dict)
+ self._finish_multiline_status()
+ return ret, True
+
+ def real_download(self, filename, info_dict):
+ """Real download process. Redefine in subclasses."""
+ raise NotImplementedError('This method must be implemented by subclasses')
+
+ def _hook_progress(self, status, info_dict):
+ # Ideally we want to make a copy of the dict, but that is too slow
+ status['info_dict'] = info_dict
+ # youtube-dl passes the same status object to all the hooks.
+ # Some third party scripts seems to be relying on this.
+ # So keep this behavior if possible
+ for ph in self._progress_hooks:
+ ph(status)
+
+ def add_progress_hook(self, ph):
+ # See YoutubeDl.py (search for progress_hooks) for a description of
+ # this interface
+ self._progress_hooks.append(ph)
+
+ def _debug_cmd(self, args, exe=None):
+ if not self.params.get('verbose', False):
+ return
+
+ str_args = [decodeArgument(a) for a in args]
+
+ if exe is None:
+ exe = os.path.basename(str_args[0])
+
+ self.write_debug(f'{exe} command line: {shell_quote(str_args)}')
diff --git a/yt_dlp/downloader/dash.py b/yt_dlp/downloader/dash.py
new file mode 100644
index 0000000..afc79b6
--- /dev/null
+++ b/yt_dlp/downloader/dash.py
@@ -0,0 +1,90 @@
+import time
+import urllib.parse
+
+from . import get_suitable_downloader
+from .fragment import FragmentFD
+from ..utils import update_url_query, urljoin
+
+
+class DashSegmentsFD(FragmentFD):
+ """
+ Download segments in a DASH manifest. External downloaders can take over
+ the fragment downloads by supporting the 'dash_frag_urls' protocol
+ """
+
+ FD_NAME = 'dashsegments'
+
+ def real_download(self, filename, info_dict):
+ if 'http_dash_segments_generator' in info_dict['protocol'].split('+'):
+ real_downloader = None # No external FD can support --live-from-start
+ else:
+ if info_dict.get('is_live'):
+ self.report_error('Live DASH videos are not supported')
+ real_downloader = get_suitable_downloader(
+ info_dict, self.params, None, protocol='dash_frag_urls', to_stdout=(filename == '-'))
+
+ real_start = time.time()
+
+ requested_formats = [{**info_dict, **fmt} for fmt in info_dict.get('requested_formats', [])]
+ args = []
+ for fmt in requested_formats or [info_dict]:
+ try:
+ fragment_count = 1 if self.params.get('test') else len(fmt['fragments'])
+ except TypeError:
+ fragment_count = None
+ ctx = {
+ 'filename': fmt.get('filepath') or filename,
+ 'live': 'is_from_start' if fmt.get('is_from_start') else fmt.get('is_live'),
+ 'total_frags': fragment_count,
+ }
+
+ if real_downloader:
+ self._prepare_external_frag_download(ctx)
+ else:
+ self._prepare_and_start_frag_download(ctx, fmt)
+ ctx['start'] = real_start
+
+ extra_query = None
+ extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url')
+ if extra_param_to_segment_url:
+ extra_query = urllib.parse.parse_qs(extra_param_to_segment_url)
+
+ fragments_to_download = self._get_fragments(fmt, ctx, extra_query)
+
+ if real_downloader:
+ self.to_screen(
+ f'[{self.FD_NAME}] Fragment downloads will be delegated to {real_downloader.get_basename()}')
+ info_dict['fragments'] = list(fragments_to_download)
+ fd = real_downloader(self.ydl, self.params)
+ return fd.real_download(filename, info_dict)
+
+ args.append([ctx, fragments_to_download, fmt])
+
+ return self.download_and_append_fragments_multiple(*args, is_fatal=lambda idx: idx == 0)
+
+ def _resolve_fragments(self, fragments, ctx):
+ fragments = fragments(ctx) if callable(fragments) else fragments
+ return [next(iter(fragments))] if self.params.get('test') else fragments
+
+ def _get_fragments(self, fmt, ctx, extra_query):
+ fragment_base_url = fmt.get('fragment_base_url')
+ fragments = self._resolve_fragments(fmt['fragments'], ctx)
+
+ frag_index = 0
+ for i, fragment in enumerate(fragments):
+ frag_index += 1
+ if frag_index <= ctx['fragment_index']:
+ continue
+ fragment_url = fragment.get('url')
+ if not fragment_url:
+ assert fragment_base_url
+ fragment_url = urljoin(fragment_base_url, fragment['path'])
+ if extra_query:
+ fragment_url = update_url_query(fragment_url, extra_query)
+
+ yield {
+ 'frag_index': frag_index,
+ 'fragment_count': fragment.get('fragment_count'),
+ 'index': i,
+ 'url': fragment_url,
+ }
diff --git a/yt_dlp/downloader/external.py b/yt_dlp/downloader/external.py
new file mode 100644
index 0000000..ce5eeb0
--- /dev/null
+++ b/yt_dlp/downloader/external.py
@@ -0,0 +1,664 @@
+import enum
+import json
+import os
+import re
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from .fragment import FragmentFD
+from ..compat import functools
+from ..networking import Request
+from ..postprocessor.ffmpeg import EXT_TO_OUT_FORMATS, FFmpegPostProcessor
+from ..utils import (
+ Popen,
+ RetryManager,
+ _configuration_args,
+ check_executable,
+ classproperty,
+ cli_bool_option,
+ cli_option,
+ cli_valueless_option,
+ determine_ext,
+ encodeArgument,
+ encodeFilename,
+ find_available_port,
+ remove_end,
+ traverse_obj,
+)
+
+
+class Features(enum.Enum):
+ TO_STDOUT = enum.auto()
+ MULTIPLE_FORMATS = enum.auto()
+
+
+class ExternalFD(FragmentFD):
+ SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps')
+ SUPPORTED_FEATURES = ()
+ _CAPTURE_STDERR = True
+
+ def real_download(self, filename, info_dict):
+ self.report_destination(filename)
+ tmpfilename = self.temp_name(filename)
+ self._cookies_tempfile = None
+
+ try:
+ started = time.time()
+ retval = self._call_downloader(tmpfilename, info_dict)
+ except KeyboardInterrupt:
+ if not info_dict.get('is_live'):
+ raise
+ # Live stream downloading cancellation should be considered as
+ # correct and expected termination thus all postprocessing
+ # should take place
+ retval = 0
+ self.to_screen('[%s] Interrupted by user' % self.get_basename())
+ finally:
+ if self._cookies_tempfile:
+ self.try_remove(self._cookies_tempfile)
+
+ if retval == 0:
+ status = {
+ 'filename': filename,
+ 'status': 'finished',
+ 'elapsed': time.time() - started,
+ }
+ if filename != '-':
+ fsize = os.path.getsize(encodeFilename(tmpfilename))
+ self.try_rename(tmpfilename, filename)
+ status.update({
+ 'downloaded_bytes': fsize,
+ 'total_bytes': fsize,
+ })
+ self._hook_progress(status, info_dict)
+ return True
+ else:
+ self.to_stderr('\n')
+ self.report_error('%s exited with code %d' % (
+ self.get_basename(), retval))
+ return False
+
+ @classmethod
+ def get_basename(cls):
+ return cls.__name__[:-2].lower()
+
+ @classproperty
+ def EXE_NAME(cls):
+ return cls.get_basename()
+
+ @functools.cached_property
+ def exe(self):
+ return self.EXE_NAME
+
+ @classmethod
+ def available(cls, path=None):
+ path = check_executable(
+ cls.EXE_NAME if path in (None, cls.get_basename()) else path,
+ [cls.AVAILABLE_OPT])
+ if not path:
+ return False
+ cls.exe = path
+ return path
+
+ @classmethod
+ def supports(cls, info_dict):
+ return all((
+ not info_dict.get('to_stdout') or Features.TO_STDOUT in cls.SUPPORTED_FEATURES,
+ '+' not in info_dict['protocol'] or Features.MULTIPLE_FORMATS in cls.SUPPORTED_FEATURES,
+ not traverse_obj(info_dict, ('hls_aes', ...), 'extra_param_to_segment_url'),
+ all(proto in cls.SUPPORTED_PROTOCOLS for proto in info_dict['protocol'].split('+')),
+ ))
+
+ @classmethod
+ def can_download(cls, info_dict, path=None):
+ return cls.available(path) and cls.supports(info_dict)
+
+ def _option(self, command_option, param):
+ return cli_option(self.params, command_option, param)
+
+ def _bool_option(self, command_option, param, true_value='true', false_value='false', separator=None):
+ return cli_bool_option(self.params, command_option, param, true_value, false_value, separator)
+
+ def _valueless_option(self, command_option, param, expected_value=True):
+ return cli_valueless_option(self.params, command_option, param, expected_value)
+
+ def _configuration_args(self, keys=None, *args, **kwargs):
+ return _configuration_args(
+ self.get_basename(), self.params.get('external_downloader_args'), self.EXE_NAME,
+ keys, *args, **kwargs)
+
+ def _write_cookies(self):
+ if not self.ydl.cookiejar.filename:
+ tmp_cookies = tempfile.NamedTemporaryFile(suffix='.cookies', delete=False)
+ tmp_cookies.close()
+ self._cookies_tempfile = tmp_cookies.name
+ self.to_screen(f'[download] Writing temporary cookies file to "{self._cookies_tempfile}"')
+ # real_download resets _cookies_tempfile; if it's None then save() will write to cookiejar.filename
+ self.ydl.cookiejar.save(self._cookies_tempfile)
+ return self.ydl.cookiejar.filename or self._cookies_tempfile
+
+ def _call_downloader(self, tmpfilename, info_dict):
+ """ Either overwrite this or implement _make_cmd """
+ cmd = [encodeArgument(a) for a in self._make_cmd(tmpfilename, info_dict)]
+
+ self._debug_cmd(cmd)
+
+ if 'fragments' not in info_dict:
+ _, stderr, returncode = self._call_process(cmd, info_dict)
+ if returncode and stderr:
+ self.to_stderr(stderr)
+ return returncode
+
+ skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
+
+ retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry,
+ frag_index=None, fatal=not skip_unavailable_fragments)
+ for retry in retry_manager:
+ _, stderr, returncode = self._call_process(cmd, info_dict)
+ if not returncode:
+ break
+ # TODO: Decide whether to retry based on error code
+ # https://aria2.github.io/manual/en/html/aria2c.html#exit-status
+ if stderr:
+ self.to_stderr(stderr)
+ retry.error = Exception()
+ continue
+ if not skip_unavailable_fragments and retry_manager.error:
+ return -1
+
+ decrypt_fragment = self.decrypter(info_dict)
+ dest, _ = self.sanitize_open(tmpfilename, 'wb')
+ for frag_index, fragment in enumerate(info_dict['fragments']):
+ fragment_filename = '%s-Frag%d' % (tmpfilename, frag_index)
+ try:
+ src, _ = self.sanitize_open(fragment_filename, 'rb')
+ except OSError as err:
+ if skip_unavailable_fragments and frag_index > 1:
+ self.report_skip_fragment(frag_index, err)
+ continue
+ self.report_error(f'Unable to open fragment {frag_index}; {err}')
+ return -1
+ dest.write(decrypt_fragment(fragment, src.read()))
+ src.close()
+ if not self.params.get('keep_fragments', False):
+ self.try_remove(encodeFilename(fragment_filename))
+ dest.close()
+ self.try_remove(encodeFilename('%s.frag.urls' % tmpfilename))
+ return 0
+
+ def _call_process(self, cmd, info_dict):
+ return Popen.run(cmd, text=True, stderr=subprocess.PIPE if self._CAPTURE_STDERR else None)
+
+
+class CurlFD(ExternalFD):
+ AVAILABLE_OPT = '-V'
+ _CAPTURE_STDERR = False # curl writes the progress to stderr
+
+ def _make_cmd(self, tmpfilename, info_dict):
+ cmd = [self.exe, '--location', '-o', tmpfilename, '--compressed']
+ cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
+ if cookie_header:
+ cmd += ['--cookie', cookie_header]
+ if info_dict.get('http_headers') is not None:
+ for key, val in info_dict['http_headers'].items():
+ cmd += ['--header', f'{key}: {val}']
+
+ cmd += self._bool_option('--continue-at', 'continuedl', '-', '0')
+ cmd += self._valueless_option('--silent', 'noprogress')
+ cmd += self._valueless_option('--verbose', 'verbose')
+ cmd += self._option('--limit-rate', 'ratelimit')
+ retry = self._option('--retry', 'retries')
+ if len(retry) == 2:
+ if retry[1] in ('inf', 'infinite'):
+ retry[1] = '2147483647'
+ cmd += retry
+ cmd += self._option('--max-filesize', 'max_filesize')
+ cmd += self._option('--interface', 'source_address')
+ cmd += self._option('--proxy', 'proxy')
+ cmd += self._valueless_option('--insecure', 'nocheckcertificate')
+ cmd += self._configuration_args()
+ cmd += ['--', info_dict['url']]
+ return cmd
+
+
+class AxelFD(ExternalFD):
+ AVAILABLE_OPT = '-V'
+
+ def _make_cmd(self, tmpfilename, info_dict):
+ cmd = [self.exe, '-o', tmpfilename]
+ if info_dict.get('http_headers') is not None:
+ for key, val in info_dict['http_headers'].items():
+ cmd += ['-H', f'{key}: {val}']
+ cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
+ if cookie_header:
+ cmd += ['-H', f'Cookie: {cookie_header}', '--max-redirect=0']
+ cmd += self._configuration_args()
+ cmd += ['--', info_dict['url']]
+ return cmd
+
+
+class WgetFD(ExternalFD):
+ AVAILABLE_OPT = '--version'
+
+ def _make_cmd(self, tmpfilename, info_dict):
+ cmd = [self.exe, '-O', tmpfilename, '-nv', '--compression=auto']
+ if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
+ cmd += ['--load-cookies', self._write_cookies()]
+ if info_dict.get('http_headers') is not None:
+ for key, val in info_dict['http_headers'].items():
+ cmd += ['--header', f'{key}: {val}']
+ cmd += self._option('--limit-rate', 'ratelimit')
+ retry = self._option('--tries', 'retries')
+ if len(retry) == 2:
+ if retry[1] in ('inf', 'infinite'):
+ retry[1] = '0'
+ cmd += retry
+ cmd += self._option('--bind-address', 'source_address')
+ proxy = self.params.get('proxy')
+ if proxy:
+ for var in ('http_proxy', 'https_proxy'):
+ cmd += ['--execute', f'{var}={proxy}']
+ cmd += self._valueless_option('--no-check-certificate', 'nocheckcertificate')
+ cmd += self._configuration_args()
+ cmd += ['--', info_dict['url']]
+ return cmd
+
+
+class Aria2cFD(ExternalFD):
+ AVAILABLE_OPT = '-v'
+ SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'dash_frag_urls', 'm3u8_frag_urls')
+
+ @staticmethod
+ def supports_manifest(manifest):
+ UNSUPPORTED_FEATURES = [
+ r'#EXT-X-BYTERANGE', # playlists composed of byte ranges of media files [1]
+ # 1. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.2
+ ]
+ check_results = (not re.search(feature, manifest) for feature in UNSUPPORTED_FEATURES)
+ return all(check_results)
+
+ @staticmethod
+ def _aria2c_filename(fn):
+ return fn if os.path.isabs(fn) else f'.{os.path.sep}{fn}'
+
+ def _call_downloader(self, tmpfilename, info_dict):
+ # FIXME: Disabled due to https://github.com/yt-dlp/yt-dlp/issues/5931
+ if False and 'no-external-downloader-progress' not in self.params.get('compat_opts', []):
+ info_dict['__rpc'] = {
+ 'port': find_available_port() or 19190,
+ 'secret': str(uuid.uuid4()),
+ }
+ return super()._call_downloader(tmpfilename, info_dict)
+
+ def _make_cmd(self, tmpfilename, info_dict):
+ cmd = [self.exe, '-c', '--no-conf',
+ '--console-log-level=warn', '--summary-interval=0', '--download-result=hide',
+ '--http-accept-gzip=true', '--file-allocation=none', '-x16', '-j16', '-s16']
+ if 'fragments' in info_dict:
+ cmd += ['--allow-overwrite=true', '--allow-piece-length-change=true']
+ else:
+ cmd += ['--min-split-size', '1M']
+
+ if self.ydl.cookiejar.get_cookie_header(info_dict['url']):
+ cmd += [f'--load-cookies={self._write_cookies()}']
+ if info_dict.get('http_headers') is not None:
+ for key, val in info_dict['http_headers'].items():
+ cmd += ['--header', f'{key}: {val}']
+ cmd += self._option('--max-overall-download-limit', 'ratelimit')
+ cmd += self._option('--interface', 'source_address')
+ cmd += self._option('--all-proxy', 'proxy')
+ cmd += self._bool_option('--check-certificate', 'nocheckcertificate', 'false', 'true', '=')
+ cmd += self._bool_option('--remote-time', 'updatetime', 'true', 'false', '=')
+ cmd += self._bool_option('--show-console-readout', 'noprogress', 'false', 'true', '=')
+ cmd += self._configuration_args()
+
+ if '__rpc' in info_dict:
+ cmd += [
+ '--enable-rpc',
+ f'--rpc-listen-port={info_dict["__rpc"]["port"]}',
+ f'--rpc-secret={info_dict["__rpc"]["secret"]}']
+
+ # aria2c strips out spaces from the beginning/end of filenames and paths.
+ # We work around this issue by adding a "./" to the beginning of the
+ # filename and relative path, and adding a "/" at the end of the path.
+ # See: https://github.com/yt-dlp/yt-dlp/issues/276
+ # https://github.com/ytdl-org/youtube-dl/issues/20312
+ # https://github.com/aria2/aria2/issues/1373
+ dn = os.path.dirname(tmpfilename)
+ if dn:
+ cmd += ['--dir', self._aria2c_filename(dn) + os.path.sep]
+ if 'fragments' not in info_dict:
+ cmd += ['--out', self._aria2c_filename(os.path.basename(tmpfilename))]
+ cmd += ['--auto-file-renaming=false']
+
+ if 'fragments' in info_dict:
+ cmd += ['--uri-selector=inorder']
+ url_list_file = '%s.frag.urls' % tmpfilename
+ url_list = []
+ for frag_index, fragment in enumerate(info_dict['fragments']):
+ fragment_filename = '%s-Frag%d' % (os.path.basename(tmpfilename), frag_index)
+ url_list.append('%s\n\tout=%s' % (fragment['url'], self._aria2c_filename(fragment_filename)))
+ stream, _ = self.sanitize_open(url_list_file, 'wb')
+ stream.write('\n'.join(url_list).encode())
+ stream.close()
+ cmd += ['-i', self._aria2c_filename(url_list_file)]
+ else:
+ cmd += ['--', info_dict['url']]
+ return cmd
+
+ def aria2c_rpc(self, rpc_port, rpc_secret, method, params=()):
+ # Does not actually need to be UUID, just unique
+ sanitycheck = str(uuid.uuid4())
+ d = json.dumps({
+ 'jsonrpc': '2.0',
+ 'id': sanitycheck,
+ 'method': method,
+ 'params': [f'token:{rpc_secret}', *params],
+ }).encode('utf-8')
+ request = Request(
+ f'http://localhost:{rpc_port}/jsonrpc',
+ data=d, headers={
+ 'Content-Type': 'application/json',
+ 'Content-Length': f'{len(d)}',
+ }, proxies={'all': None})
+ with self.ydl.urlopen(request) as r:
+ resp = json.load(r)
+ assert resp.get('id') == sanitycheck, 'Something went wrong with RPC server'
+ return resp['result']
+
+ def _call_process(self, cmd, info_dict):
+ if '__rpc' not in info_dict:
+ return super()._call_process(cmd, info_dict)
+
+ send_rpc = functools.partial(self.aria2c_rpc, info_dict['__rpc']['port'], info_dict['__rpc']['secret'])
+ started = time.time()
+
+ fragmented = 'fragments' in info_dict
+ frag_count = len(info_dict['fragments']) if fragmented else 1
+ status = {
+ 'filename': info_dict.get('_filename'),
+ 'status': 'downloading',
+ 'elapsed': 0,
+ 'downloaded_bytes': 0,
+ 'fragment_count': frag_count if fragmented else None,
+ 'fragment_index': 0 if fragmented else None,
+ }
+ self._hook_progress(status, info_dict)
+
+ def get_stat(key, *obj, average=False):
+ val = tuple(filter(None, map(float, traverse_obj(obj, (..., ..., key))))) or [0]
+ return sum(val) / (len(val) if average else 1)
+
+ with Popen(cmd, text=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) as p:
+ # Add a small sleep so that RPC client can receive response,
+ # or the connection stalls infinitely
+ time.sleep(0.2)
+ retval = p.poll()
+ while retval is None:
+ # We don't use tellStatus as we won't know the GID without reading stdout
+ # Ref: https://aria2.github.io/manual/en/html/aria2c.html#aria2.tellActive
+ active = send_rpc('aria2.tellActive')
+ completed = send_rpc('aria2.tellStopped', [0, frag_count])
+
+ downloaded = get_stat('totalLength', completed) + get_stat('completedLength', active)
+ speed = get_stat('downloadSpeed', active)
+ total = frag_count * get_stat('totalLength', active, completed, average=True)
+ if total < downloaded:
+ total = None
+
+ status.update({
+ 'downloaded_bytes': int(downloaded),
+ 'speed': speed,
+ 'total_bytes': None if fragmented else total,
+ 'total_bytes_estimate': total,
+ 'eta': (total - downloaded) / (speed or 1),
+ 'fragment_index': min(frag_count, len(completed) + 1) if fragmented else None,
+ 'elapsed': time.time() - started
+ })
+ self._hook_progress(status, info_dict)
+
+ if not active and len(completed) >= frag_count:
+ send_rpc('aria2.shutdown')
+ retval = p.wait()
+ break
+
+ time.sleep(0.1)
+ retval = p.poll()
+
+ return '', p.stderr.read(), retval
+
+
+class HttpieFD(ExternalFD):
+ AVAILABLE_OPT = '--version'
+ EXE_NAME = 'http'
+
+ def _make_cmd(self, tmpfilename, info_dict):
+ cmd = ['http', '--download', '--output', tmpfilename, info_dict['url']]
+
+ if info_dict.get('http_headers') is not None:
+ for key, val in info_dict['http_headers'].items():
+ cmd += [f'{key}:{val}']
+
+ # httpie 3.1.0+ removes the Cookie header on redirect, so this should be safe for now. [1]
+ # If we ever need cookie handling for redirects, we can export the cookiejar into a session. [2]
+ # 1: https://github.com/httpie/httpie/security/advisories/GHSA-9w4w-cpc8-h2fq
+ # 2: https://httpie.io/docs/cli/sessions
+ cookie_header = self.ydl.cookiejar.get_cookie_header(info_dict['url'])
+ if cookie_header:
+ cmd += [f'Cookie:{cookie_header}']
+ return cmd
+
+
+class FFmpegFD(ExternalFD):
+ SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'm3u8', 'm3u8_native', 'rtsp', 'rtmp', 'rtmp_ffmpeg', 'mms', 'http_dash_segments')
+ SUPPORTED_FEATURES = (Features.TO_STDOUT, Features.MULTIPLE_FORMATS)
+
+ @classmethod
+ def available(cls, path=None):
+ # TODO: Fix path for ffmpeg
+ # Fixme: This may be wrong when --ffmpeg-location is used
+ return FFmpegPostProcessor().available
+
+ def on_process_started(self, proc, stdin):
+ """ Override this in subclasses """
+ pass
+
+ @classmethod
+ def can_merge_formats(cls, info_dict, params):
+ return (
+ info_dict.get('requested_formats')
+ and info_dict.get('protocol')
+ and not params.get('allow_unplayable_formats')
+ and 'no-direct-merge' not in params.get('compat_opts', [])
+ and cls.can_download(info_dict))
+
+ def _call_downloader(self, tmpfilename, info_dict):
+ ffpp = FFmpegPostProcessor(downloader=self)
+ if not ffpp.available:
+ self.report_error('m3u8 download detected but ffmpeg could not be found. Please install')
+ return False
+ ffpp.check_version()
+
+ args = [ffpp.executable, '-y']
+
+ for log_level in ('quiet', 'verbose'):
+ if self.params.get(log_level, False):
+ args += ['-loglevel', log_level]
+ break
+ if not self.params.get('verbose'):
+ args += ['-hide_banner']
+
+ args += traverse_obj(info_dict, ('downloader_options', 'ffmpeg_args'), default=[])
+
+ # These exists only for compatibility. Extractors should use
+ # info_dict['downloader_options']['ffmpeg_args'] instead
+ args += info_dict.get('_ffmpeg_args') or []
+ seekable = info_dict.get('_seekable')
+ if seekable is not None:
+ # setting -seekable prevents ffmpeg from guessing if the server
+ # supports seeking(by adding the header `Range: bytes=0-`), which
+ # can cause problems in some cases
+ # https://github.com/ytdl-org/youtube-dl/issues/11800#issuecomment-275037127
+ # http://trac.ffmpeg.org/ticket/6125#comment:10
+ args += ['-seekable', '1' if seekable else '0']
+
+ env = None
+ proxy = self.params.get('proxy')
+ if proxy:
+ if not re.match(r'^[\da-zA-Z]+://', proxy):
+ proxy = 'http://%s' % proxy
+
+ if proxy.startswith('socks'):
+ self.report_warning(
+ '%s does not support SOCKS proxies. Downloading is likely to fail. '
+ 'Consider adding --hls-prefer-native to your command.' % self.get_basename())
+
+ # Since December 2015 ffmpeg supports -http_proxy option (see
+ # http://git.videolan.org/?p=ffmpeg.git;a=commit;h=b4eb1f29ebddd60c41a2eb39f5af701e38e0d3fd)
+ # We could switch to the following code if we are able to detect version properly
+ # args += ['-http_proxy', proxy]
+ env = os.environ.copy()
+ env['HTTP_PROXY'] = proxy
+ env['http_proxy'] = proxy
+
+ protocol = info_dict.get('protocol')
+
+ if protocol == 'rtmp':
+ player_url = info_dict.get('player_url')
+ page_url = info_dict.get('page_url')
+ app = info_dict.get('app')
+ play_path = info_dict.get('play_path')
+ tc_url = info_dict.get('tc_url')
+ flash_version = info_dict.get('flash_version')
+ live = info_dict.get('rtmp_live', False)
+ conn = info_dict.get('rtmp_conn')
+ if player_url is not None:
+ args += ['-rtmp_swfverify', player_url]
+ if page_url is not None:
+ args += ['-rtmp_pageurl', page_url]
+ if app is not None:
+ args += ['-rtmp_app', app]
+ if play_path is not None:
+ args += ['-rtmp_playpath', play_path]
+ if tc_url is not None:
+ args += ['-rtmp_tcurl', tc_url]
+ if flash_version is not None:
+ args += ['-rtmp_flashver', flash_version]
+ if live:
+ args += ['-rtmp_live', 'live']
+ if isinstance(conn, list):
+ for entry in conn:
+ args += ['-rtmp_conn', entry]
+ elif isinstance(conn, str):
+ args += ['-rtmp_conn', conn]
+
+ start_time, end_time = info_dict.get('section_start') or 0, info_dict.get('section_end')
+
+ selected_formats = info_dict.get('requested_formats') or [info_dict]
+ for i, fmt in enumerate(selected_formats):
+ is_http = re.match(r'^https?://', fmt['url'])
+ cookies = self.ydl.cookiejar.get_cookies_for_url(fmt['url']) if is_http else []
+ if cookies:
+ args.extend(['-cookies', ''.join(
+ f'{cookie.name}={cookie.value}; path={cookie.path}; domain={cookie.domain};\r\n'
+ for cookie in cookies)])
+ if fmt.get('http_headers') and is_http:
+ # Trailing \r\n after each HTTP header is important to prevent warning from ffmpeg/avconv:
+ # [http @ 00000000003d2fa0] No trailing CRLF found in HTTP header.
+ args.extend(['-headers', ''.join(f'{key}: {val}\r\n' for key, val in fmt['http_headers'].items())])
+
+ if start_time:
+ args += ['-ss', str(start_time)]
+ if end_time:
+ args += ['-t', str(end_time - start_time)]
+
+ args += self._configuration_args((f'_i{i + 1}', '_i')) + ['-i', fmt['url']]
+
+ if not (start_time or end_time) or not self.params.get('force_keyframes_at_cuts'):
+ args += ['-c', 'copy']
+
+ if info_dict.get('requested_formats') or protocol == 'http_dash_segments':
+ for i, fmt in enumerate(selected_formats):
+ stream_number = fmt.get('manifest_stream_number', 0)
+ args.extend(['-map', f'{i}:{stream_number}'])
+
+ if self.params.get('test', False):
+ args += ['-fs', str(self._TEST_FILE_SIZE)]
+
+ ext = info_dict['ext']
+ if protocol in ('m3u8', 'm3u8_native'):
+ use_mpegts = (tmpfilename == '-') or self.params.get('hls_use_mpegts')
+ if use_mpegts is None:
+ use_mpegts = info_dict.get('is_live')
+ if use_mpegts:
+ args += ['-f', 'mpegts']
+ else:
+ args += ['-f', 'mp4']
+ if (ffpp.basename == 'ffmpeg' and ffpp._features.get('needs_adtstoasc')) and (not info_dict.get('acodec') or info_dict['acodec'].split('.')[0] in ('aac', 'mp4a')):
+ args += ['-bsf:a', 'aac_adtstoasc']
+ elif protocol == 'rtmp':
+ args += ['-f', 'flv']
+ elif ext == 'mp4' and tmpfilename == '-':
+ args += ['-f', 'mpegts']
+ elif ext == 'unknown_video':
+ ext = determine_ext(remove_end(tmpfilename, '.part'))
+ if ext == 'unknown_video':
+ self.report_warning(
+ 'The video format is unknown and cannot be downloaded by ffmpeg. '
+ 'Explicitly set the extension in the filename to attempt download in that format')
+ else:
+ self.report_warning(f'The video format is unknown. Trying to download as {ext} according to the filename')
+ args += ['-f', EXT_TO_OUT_FORMATS.get(ext, ext)]
+ else:
+ args += ['-f', EXT_TO_OUT_FORMATS.get(ext, ext)]
+
+ args += self._configuration_args(('_o1', '_o', ''))
+
+ args = [encodeArgument(opt) for opt in args]
+ args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True))
+ self._debug_cmd(args)
+
+ piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats)
+ with Popen(args, stdin=subprocess.PIPE, env=env) as proc:
+ if piped:
+ self.on_process_started(proc, proc.stdin)
+ try:
+ retval = proc.wait()
+ except BaseException as e:
+ # subprocces.run would send the SIGKILL signal to ffmpeg and the
+ # mp4 file couldn't be played, but if we ask ffmpeg to quit it
+ # produces a file that is playable (this is mostly useful for live
+ # streams). Note that Windows is not affected and produces playable
+ # files (see https://github.com/ytdl-org/youtube-dl/issues/8300).
+ if isinstance(e, KeyboardInterrupt) and sys.platform != 'win32' and not piped:
+ proc.communicate_or_kill(b'q')
+ else:
+ proc.kill(timeout=None)
+ raise
+ return retval
+
+
+class AVconvFD(FFmpegFD):
+ pass
+
+
+_BY_NAME = {
+ klass.get_basename(): klass
+ for name, klass in globals().items()
+ if name.endswith('FD') and name not in ('ExternalFD', 'FragmentFD')
+}
+
+
+def list_external_downloaders():
+ return sorted(_BY_NAME.keys())
+
+
+def get_external_downloader(external_downloader):
+ """ Given the name of the executable, see whether we support the given downloader """
+ bn = os.path.splitext(os.path.basename(external_downloader))[0]
+ return _BY_NAME.get(bn) or next((
+ klass for klass in _BY_NAME.values() if klass.EXE_NAME in bn
+ ), None)
diff --git a/yt_dlp/downloader/f4m.py b/yt_dlp/downloader/f4m.py
new file mode 100644
index 0000000..28cbba0
--- /dev/null
+++ b/yt_dlp/downloader/f4m.py
@@ -0,0 +1,427 @@
+import base64
+import io
+import itertools
+import struct
+import time
+import urllib.parse
+
+from .fragment import FragmentFD
+from ..compat import compat_etree_fromstring
+from ..networking.exceptions import HTTPError
+from ..utils import fix_xml_ampersands, xpath_text
+
+
+class DataTruncatedError(Exception):
+ pass
+
+
+class FlvReader(io.BytesIO):
+ """
+ Reader for Flv files
+ The file format is documented in https://www.adobe.com/devnet/f4v.html
+ """
+
+ def read_bytes(self, n):
+ data = self.read(n)
+ if len(data) < n:
+ raise DataTruncatedError(
+ 'FlvReader error: need %d bytes while only %d bytes got' % (
+ n, len(data)))
+ return data
+
+ # Utility functions for reading numbers and strings
+ def read_unsigned_long_long(self):
+ return struct.unpack('!Q', self.read_bytes(8))[0]
+
+ def read_unsigned_int(self):
+ return struct.unpack('!I', self.read_bytes(4))[0]
+
+ def read_unsigned_char(self):
+ return struct.unpack('!B', self.read_bytes(1))[0]
+
+ def read_string(self):
+ res = b''
+ while True:
+ char = self.read_bytes(1)
+ if char == b'\x00':
+ break
+ res += char
+ return res
+
+ def read_box_info(self):
+ """
+ Read a box and return the info as a tuple: (box_size, box_type, box_data)
+ """
+ real_size = size = self.read_unsigned_int()
+ box_type = self.read_bytes(4)
+ header_end = 8
+ if size == 1:
+ real_size = self.read_unsigned_long_long()
+ header_end = 16
+ return real_size, box_type, self.read_bytes(real_size - header_end)
+
+ def read_asrt(self):
+ # version
+ self.read_unsigned_char()
+ # flags
+ self.read_bytes(3)
+ quality_entry_count = self.read_unsigned_char()
+ # QualityEntryCount
+ for i in range(quality_entry_count):
+ self.read_string()
+
+ segment_run_count = self.read_unsigned_int()
+ segments = []
+ for i in range(segment_run_count):
+ first_segment = self.read_unsigned_int()
+ fragments_per_segment = self.read_unsigned_int()
+ segments.append((first_segment, fragments_per_segment))
+
+ return {
+ 'segment_run': segments,
+ }
+
+ def read_afrt(self):
+ # version
+ self.read_unsigned_char()
+ # flags
+ self.read_bytes(3)
+ # time scale
+ self.read_unsigned_int()
+
+ quality_entry_count = self.read_unsigned_char()
+ # QualitySegmentUrlModifiers
+ for i in range(quality_entry_count):
+ self.read_string()
+
+ fragments_count = self.read_unsigned_int()
+ fragments = []
+ for i in range(fragments_count):
+ first = self.read_unsigned_int()
+ first_ts = self.read_unsigned_long_long()
+ duration = self.read_unsigned_int()
+ if duration == 0:
+ discontinuity_indicator = self.read_unsigned_char()
+ else:
+ discontinuity_indicator = None
+ fragments.append({
+ 'first': first,
+ 'ts': first_ts,
+ 'duration': duration,
+ 'discontinuity_indicator': discontinuity_indicator,
+ })
+
+ return {
+ 'fragments': fragments,
+ }
+
+ def read_abst(self):
+ # version
+ self.read_unsigned_char()
+ # flags
+ self.read_bytes(3)
+
+ self.read_unsigned_int() # BootstrapinfoVersion
+ # Profile,Live,Update,Reserved
+ flags = self.read_unsigned_char()
+ live = flags & 0x20 != 0
+ # time scale
+ self.read_unsigned_int()
+ # CurrentMediaTime
+ self.read_unsigned_long_long()
+ # SmpteTimeCodeOffset
+ self.read_unsigned_long_long()
+
+ self.read_string() # MovieIdentifier
+ server_count = self.read_unsigned_char()
+ # ServerEntryTable
+ for i in range(server_count):
+ self.read_string()
+ quality_count = self.read_unsigned_char()
+ # QualityEntryTable
+ for i in range(quality_count):
+ self.read_string()
+ # DrmData
+ self.read_string()
+ # MetaData
+ self.read_string()
+
+ segments_count = self.read_unsigned_char()
+ segments = []
+ for i in range(segments_count):
+ box_size, box_type, box_data = self.read_box_info()
+ assert box_type == b'asrt'
+ segment = FlvReader(box_data).read_asrt()
+ segments.append(segment)
+ fragments_run_count = self.read_unsigned_char()
+ fragments = []
+ for i in range(fragments_run_count):
+ box_size, box_type, box_data = self.read_box_info()
+ assert box_type == b'afrt'
+ fragments.append(FlvReader(box_data).read_afrt())
+
+ return {
+ 'segments': segments,
+ 'fragments': fragments,
+ 'live': live,
+ }
+
+ def read_bootstrap_info(self):
+ total_size, box_type, box_data = self.read_box_info()
+ assert box_type == b'abst'
+ return FlvReader(box_data).read_abst()
+
+
+def read_bootstrap_info(bootstrap_bytes):
+ return FlvReader(bootstrap_bytes).read_bootstrap_info()
+
+
+def build_fragments_list(boot_info):
+ """ Return a list of (segment, fragment) for each fragment in the video """
+ res = []
+ segment_run_table = boot_info['segments'][0]
+ fragment_run_entry_table = boot_info['fragments'][0]['fragments']
+ first_frag_number = fragment_run_entry_table[0]['first']
+ fragments_counter = itertools.count(first_frag_number)
+ for segment, fragments_count in segment_run_table['segment_run']:
+ # In some live HDS streams (e.g. Rai), `fragments_count` is
+ # abnormal and causing out-of-memory errors. It's OK to change the
+ # number of fragments for live streams as they are updated periodically
+ if fragments_count == 4294967295 and boot_info['live']:
+ fragments_count = 2
+ for _ in range(fragments_count):
+ res.append((segment, next(fragments_counter)))
+
+ if boot_info['live']:
+ res = res[-2:]
+
+ return res
+
+
+def write_unsigned_int(stream, val):
+ stream.write(struct.pack('!I', val))
+
+
+def write_unsigned_int_24(stream, val):
+ stream.write(struct.pack('!I', val)[1:])
+
+
+def write_flv_header(stream):
+ """Writes the FLV header to stream"""
+ # FLV header
+ stream.write(b'FLV\x01')
+ stream.write(b'\x05')
+ stream.write(b'\x00\x00\x00\x09')
+ stream.write(b'\x00\x00\x00\x00')
+
+
+def write_metadata_tag(stream, metadata):
+ """Writes optional metadata tag to stream"""
+ SCRIPT_TAG = b'\x12'
+ FLV_TAG_HEADER_LEN = 11
+
+ if metadata:
+ stream.write(SCRIPT_TAG)
+ write_unsigned_int_24(stream, len(metadata))
+ stream.write(b'\x00\x00\x00\x00\x00\x00\x00')
+ stream.write(metadata)
+ write_unsigned_int(stream, FLV_TAG_HEADER_LEN + len(metadata))
+
+
+def remove_encrypted_media(media):
+ return list(filter(lambda e: 'drmAdditionalHeaderId' not in e.attrib
+ and 'drmAdditionalHeaderSetId' not in e.attrib,
+ media))
+
+
+def _add_ns(prop, ver=1):
+ return '{http://ns.adobe.com/f4m/%d.0}%s' % (ver, prop)
+
+
+def get_base_url(manifest):
+ base_url = xpath_text(
+ manifest, [_add_ns('baseURL'), _add_ns('baseURL', 2)],
+ 'base URL', default=None)
+ if base_url:
+ base_url = base_url.strip()
+ return base_url
+
+
+class F4mFD(FragmentFD):
+ """
+ A downloader for f4m manifests or AdobeHDS.
+ """
+
+ def _get_unencrypted_media(self, doc):
+ media = doc.findall(_add_ns('media'))
+ if not media:
+ self.report_error('No media found')
+ if not self.params.get('allow_unplayable_formats'):
+ for e in (doc.findall(_add_ns('drmAdditionalHeader'))
+ + doc.findall(_add_ns('drmAdditionalHeaderSet'))):
+ # If id attribute is missing it's valid for all media nodes
+ # without drmAdditionalHeaderId or drmAdditionalHeaderSetId attribute
+ if 'id' not in e.attrib:
+ self.report_error('Missing ID in f4m DRM')
+ media = remove_encrypted_media(media)
+ if not media:
+ self.report_error('Unsupported DRM')
+ return media
+
+ def _get_bootstrap_from_url(self, bootstrap_url):
+ bootstrap = self.ydl.urlopen(bootstrap_url).read()
+ return read_bootstrap_info(bootstrap)
+
+ def _update_live_fragments(self, bootstrap_url, latest_fragment):
+ fragments_list = []
+ retries = 30
+ while (not fragments_list) and (retries > 0):
+ boot_info = self._get_bootstrap_from_url(bootstrap_url)
+ fragments_list = build_fragments_list(boot_info)
+ fragments_list = [f for f in fragments_list if f[1] > latest_fragment]
+ if not fragments_list:
+ # Retry after a while
+ time.sleep(5.0)
+ retries -= 1
+
+ if not fragments_list:
+ self.report_error('Failed to update fragments')
+
+ return fragments_list
+
+ def _parse_bootstrap_node(self, node, base_url):
+ # Sometimes non empty inline bootstrap info can be specified along
+ # with bootstrap url attribute (e.g. dummy inline bootstrap info
+ # contains whitespace characters in [1]). We will prefer bootstrap
+ # url over inline bootstrap info when present.
+ # 1. http://live-1-1.rutube.ru/stream/1024/HDS/SD/C2NKsS85HQNckgn5HdEmOQ/1454167650/S-s604419906/move/four/dirs/upper/1024-576p.f4m
+ bootstrap_url = node.get('url')
+ if bootstrap_url:
+ bootstrap_url = urllib.parse.urljoin(
+ base_url, bootstrap_url)
+ boot_info = self._get_bootstrap_from_url(bootstrap_url)
+ else:
+ bootstrap_url = None
+ bootstrap = base64.b64decode(node.text)
+ boot_info = read_bootstrap_info(bootstrap)
+ return boot_info, bootstrap_url
+
+ def real_download(self, filename, info_dict):
+ man_url = info_dict['url']
+ requested_bitrate = info_dict.get('tbr')
+ self.to_screen('[%s] Downloading f4m manifest' % self.FD_NAME)
+
+ urlh = self.ydl.urlopen(self._prepare_url(info_dict, man_url))
+ man_url = urlh.url
+ # Some manifests may be malformed, e.g. prosiebensat1 generated manifests
+ # (see https://github.com/ytdl-org/youtube-dl/issues/6215#issuecomment-121704244
+ # and https://github.com/ytdl-org/youtube-dl/issues/7823)
+ manifest = fix_xml_ampersands(urlh.read().decode('utf-8', 'ignore')).strip()
+
+ doc = compat_etree_fromstring(manifest)
+ formats = [(int(f.attrib.get('bitrate', -1)), f)
+ for f in self._get_unencrypted_media(doc)]
+ if requested_bitrate is None or len(formats) == 1:
+ # get the best format
+ formats = sorted(formats, key=lambda f: f[0])
+ rate, media = formats[-1]
+ else:
+ rate, media = list(filter(
+ lambda f: int(f[0]) == requested_bitrate, formats))[0]
+
+ # Prefer baseURL for relative URLs as per 11.2 of F4M 3.0 spec.
+ man_base_url = get_base_url(doc) or man_url
+
+ base_url = urllib.parse.urljoin(man_base_url, media.attrib['url'])
+ bootstrap_node = doc.find(_add_ns('bootstrapInfo'))
+ boot_info, bootstrap_url = self._parse_bootstrap_node(
+ bootstrap_node, man_base_url)
+ live = boot_info['live']
+ metadata_node = media.find(_add_ns('metadata'))
+ if metadata_node is not None:
+ metadata = base64.b64decode(metadata_node.text)
+ else:
+ metadata = None
+
+ fragments_list = build_fragments_list(boot_info)
+ test = self.params.get('test', False)
+ if test:
+ # We only download the first fragment
+ fragments_list = fragments_list[:1]
+ total_frags = len(fragments_list)
+ # For some akamai manifests we'll need to add a query to the fragment url
+ akamai_pv = xpath_text(doc, _add_ns('pv-2.0'))
+
+ ctx = {
+ 'filename': filename,
+ 'total_frags': total_frags,
+ 'live': bool(live),
+ }
+
+ self._prepare_frag_download(ctx)
+
+ dest_stream = ctx['dest_stream']
+
+ if ctx['complete_frags_downloaded_bytes'] == 0:
+ write_flv_header(dest_stream)
+ if not live:
+ write_metadata_tag(dest_stream, metadata)
+
+ base_url_parsed = urllib.parse.urlparse(base_url)
+
+ self._start_frag_download(ctx, info_dict)
+
+ frag_index = 0
+ while fragments_list:
+ seg_i, frag_i = fragments_list.pop(0)
+ frag_index += 1
+ if frag_index <= ctx['fragment_index']:
+ continue
+ name = 'Seg%d-Frag%d' % (seg_i, frag_i)
+ query = []
+ if base_url_parsed.query:
+ query.append(base_url_parsed.query)
+ if akamai_pv:
+ query.append(akamai_pv.strip(';'))
+ if info_dict.get('extra_param_to_segment_url'):
+ query.append(info_dict['extra_param_to_segment_url'])
+ url_parsed = base_url_parsed._replace(path=base_url_parsed.path + name, query='&'.join(query))
+ try:
+ success = self._download_fragment(ctx, url_parsed.geturl(), info_dict)
+ if not success:
+ return False
+ down_data = self._read_fragment(ctx)
+ reader = FlvReader(down_data)
+ while True:
+ try:
+ _, box_type, box_data = reader.read_box_info()
+ except DataTruncatedError:
+ if test:
+ # In tests, segments may be truncated, and thus
+ # FlvReader may not be able to parse the whole
+ # chunk. If so, write the segment as is
+ # See https://github.com/ytdl-org/youtube-dl/issues/9214
+ dest_stream.write(down_data)
+ break
+ raise
+ if box_type == b'mdat':
+ self._append_fragment(ctx, box_data)
+ break
+ except HTTPError as err:
+ if live and (err.status == 404 or err.status == 410):
+ # We didn't keep up with the live window. Continue
+ # with the next available fragment.
+ msg = 'Fragment %d unavailable' % frag_i
+ self.report_warning(msg)
+ fragments_list = []
+ else:
+ raise
+
+ if not fragments_list and not test and live and bootstrap_url:
+ fragments_list = self._update_live_fragments(bootstrap_url, frag_i)
+ total_frags += len(fragments_list)
+ if fragments_list and (fragments_list[0][1] > frag_i + 1):
+ msg = 'Missed %d fragments' % (fragments_list[0][1] - (frag_i + 1))
+ self.report_warning(msg)
+
+ return self._finish_frag_download(ctx, info_dict)
diff --git a/yt_dlp/downloader/fc2.py b/yt_dlp/downloader/fc2.py
new file mode 100644
index 0000000..f9763de
--- /dev/null
+++ b/yt_dlp/downloader/fc2.py
@@ -0,0 +1,46 @@
+import threading
+
+from .common import FileDownloader
+from .external import FFmpegFD
+
+
+class FC2LiveFD(FileDownloader):
+ """
+ Downloads FC2 live without being stopped. <br>
+ Note, this is not a part of public API, and will be removed without notice.
+ DO NOT USE
+ """
+
+ def real_download(self, filename, info_dict):
+ ws = info_dict['ws']
+
+ heartbeat_lock = threading.Lock()
+ heartbeat_state = [None, 1]
+
+ def heartbeat():
+ if heartbeat_state[1] < 0:
+ return
+
+ try:
+ heartbeat_state[1] += 1
+ ws.send('{"name":"heartbeat","arguments":{},"id":%d}' % heartbeat_state[1])
+ except Exception:
+ self.to_screen('[fc2:live] Heartbeat failed')
+
+ with heartbeat_lock:
+ heartbeat_state[0] = threading.Timer(30, heartbeat)
+ heartbeat_state[0]._daemonic = True
+ heartbeat_state[0].start()
+
+ heartbeat()
+
+ new_info_dict = info_dict.copy()
+ new_info_dict.update({
+ 'ws': None,
+ 'protocol': 'live_ffmpeg',
+ })
+ try:
+ return FFmpegFD(self.ydl, self.params or {}).download(filename, new_info_dict)
+ finally:
+ # stop heartbeating
+ heartbeat_state[1] = -1
diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py
new file mode 100644
index 0000000..b4f003d
--- /dev/null
+++ b/yt_dlp/downloader/fragment.py
@@ -0,0 +1,527 @@
+import concurrent.futures
+import contextlib
+import json
+import math
+import os
+import struct
+import time
+
+from .common import FileDownloader
+from .http import HttpFD
+from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
+from ..compat import compat_os_name
+from ..networking import Request
+from ..networking.exceptions import HTTPError, IncompleteRead
+from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj
+from ..utils.networking import HTTPHeaderDict
+from ..utils.progress import ProgressCalculator
+
+
+class HttpQuietDownloader(HttpFD):
+ def to_screen(self, *args, **kargs):
+ pass
+
+ to_console_title = to_screen
+
+
+class FragmentFD(FileDownloader):
+ """
+ A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
+
+ Available options:
+
+ fragment_retries: Number of times to retry a fragment for HTTP error
+ (DASH and hlsnative only). Default is 0 for API, but 10 for CLI
+ skip_unavailable_fragments:
+ Skip unavailable fragments (DASH and hlsnative only)
+ keep_fragments: Keep downloaded fragments on disk after downloading is
+ finished
+ concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads
+ _no_ytdl_file: Don't use .ytdl file
+
+ For each incomplete fragment download yt-dlp keeps on disk a special
+ bookkeeping file with download state and metadata (in future such files will
+ be used for any incomplete download handled by yt-dlp). This file is
+ used to properly handle resuming, check download file consistency and detect
+ potential errors. The file has a .ytdl extension and represents a standard
+ JSON file of the following format:
+
+ extractor:
+ Dictionary of extractor related data. TBD.
+
+ downloader:
+ Dictionary of downloader related data. May contain following data:
+ current_fragment:
+ Dictionary with current (being downloaded) fragment data:
+ index: 0-based index of current fragment among all fragments
+ fragment_count:
+ Total count of fragments
+
+ This feature is experimental and file format may change in future.
+ """
+
+ def report_retry_fragment(self, err, frag_index, count, retries):
+ self.deprecation_warning('yt_dlp.downloader.FragmentFD.report_retry_fragment is deprecated. '
+ 'Use yt_dlp.downloader.FileDownloader.report_retry instead')
+ return self.report_retry(err, count, retries, frag_index)
+
+ def report_skip_fragment(self, frag_index, err=None):
+ err = f' {err};' if err else ''
+ self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...')
+
+ def _prepare_url(self, info_dict, url):
+ headers = info_dict.get('http_headers')
+ return Request(url, None, headers) if headers else url
+
+ def _prepare_and_start_frag_download(self, ctx, info_dict):
+ self._prepare_frag_download(ctx)
+ self._start_frag_download(ctx, info_dict)
+
+ def __do_ytdl_file(self, ctx):
+ return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
+
+ def _read_ytdl_file(self, ctx):
+ assert 'ytdl_corrupt' not in ctx
+ stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
+ try:
+ ytdl_data = json.loads(stream.read())
+ ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
+ if 'extra_state' in ytdl_data['downloader']:
+ ctx['extra_state'] = ytdl_data['downloader']['extra_state']
+ except Exception:
+ ctx['ytdl_corrupt'] = True
+ finally:
+ stream.close()
+
+ def _write_ytdl_file(self, ctx):
+ frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
+ try:
+ downloader = {
+ 'current_fragment': {
+ 'index': ctx['fragment_index'],
+ },
+ }
+ if 'extra_state' in ctx:
+ downloader['extra_state'] = ctx['extra_state']
+ if ctx.get('fragment_count') is not None:
+ downloader['fragment_count'] = ctx['fragment_count']
+ frag_index_stream.write(json.dumps({'downloader': downloader}))
+ finally:
+ frag_index_stream.close()
+
+ def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
+ fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
+ fragment_info_dict = {
+ 'url': frag_url,
+ 'http_headers': headers or info_dict.get('http_headers'),
+ 'request_data': request_data,
+ 'ctx_id': ctx.get('ctx_id'),
+ }
+ frag_resume_len = 0
+ if ctx['dl'].params.get('continuedl', True):
+ frag_resume_len = self.filesize_or_none(self.temp_name(fragment_filename))
+ fragment_info_dict['frag_resume_len'] = ctx['frag_resume_len'] = frag_resume_len
+
+ success, _ = ctx['dl'].download(fragment_filename, fragment_info_dict)
+ if not success:
+ return False
+ if fragment_info_dict.get('filetime'):
+ ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
+ ctx['fragment_filename_sanitized'] = fragment_filename
+ return True
+
+ def _read_fragment(self, ctx):
+ if not ctx.get('fragment_filename_sanitized'):
+ return None
+ try:
+ down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
+ except FileNotFoundError:
+ if ctx.get('live'):
+ return None
+ raise
+ ctx['fragment_filename_sanitized'] = frag_sanitized
+ frag_content = down.read()
+ down.close()
+ return frag_content
+
+ def _append_fragment(self, ctx, frag_content):
+ try:
+ ctx['dest_stream'].write(frag_content)
+ ctx['dest_stream'].flush()
+ finally:
+ if self.__do_ytdl_file(ctx):
+ self._write_ytdl_file(ctx)
+ if not self.params.get('keep_fragments', False):
+ self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
+ del ctx['fragment_filename_sanitized']
+
+ def _prepare_frag_download(self, ctx):
+ if not ctx.setdefault('live', False):
+ total_frags_str = '%d' % ctx['total_frags']
+ ad_frags = ctx.get('ad_frags', 0)
+ if ad_frags:
+ total_frags_str += ' (not including %d ad)' % ad_frags
+ else:
+ total_frags_str = 'unknown (live)'
+ self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
+ self.report_destination(ctx['filename'])
+ dl = HttpQuietDownloader(self.ydl, {
+ **self.params,
+ 'noprogress': True,
+ 'test': False,
+ 'sleep_interval': 0,
+ 'max_sleep_interval': 0,
+ 'sleep_interval_subtitles': 0,
+ })
+ tmpfilename = self.temp_name(ctx['filename'])
+ open_mode = 'wb'
+
+ # Establish possible resume length
+ resume_len = self.filesize_or_none(tmpfilename)
+ if resume_len > 0:
+ open_mode = 'ab'
+
+ # Should be initialized before ytdl file check
+ ctx.update({
+ 'tmpfilename': tmpfilename,
+ 'fragment_index': 0,
+ })
+
+ if self.__do_ytdl_file(ctx):
+ ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename'])))
+ continuedl = self.params.get('continuedl', True)
+ if continuedl and ytdl_file_exists:
+ self._read_ytdl_file(ctx)
+ is_corrupt = ctx.get('ytdl_corrupt') is True
+ is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
+ if is_corrupt or is_inconsistent:
+ message = (
+ '.ytdl file is corrupt' if is_corrupt else
+ 'Inconsistent state of incomplete fragment download')
+ self.report_warning(
+ '%s. Restarting from the beginning ...' % message)
+ ctx['fragment_index'] = resume_len = 0
+ if 'ytdl_corrupt' in ctx:
+ del ctx['ytdl_corrupt']
+ self._write_ytdl_file(ctx)
+
+ else:
+ if not continuedl:
+ if ytdl_file_exists:
+ self._read_ytdl_file(ctx)
+ ctx['fragment_index'] = resume_len = 0
+ self._write_ytdl_file(ctx)
+ assert ctx['fragment_index'] == 0
+
+ dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode)
+
+ ctx.update({
+ 'dl': dl,
+ 'dest_stream': dest_stream,
+ 'tmpfilename': tmpfilename,
+ # Total complete fragments downloaded so far in bytes
+ 'complete_frags_downloaded_bytes': resume_len,
+ })
+
+ def _start_frag_download(self, ctx, info_dict):
+ resume_len = ctx['complete_frags_downloaded_bytes']
+ total_frags = ctx['total_frags']
+ ctx_id = ctx.get('ctx_id')
+ # Stores the download progress, updated by the progress hook
+ state = {
+ 'status': 'downloading',
+ 'downloaded_bytes': resume_len,
+ 'fragment_index': ctx['fragment_index'],
+ 'fragment_count': total_frags,
+ 'filename': ctx['filename'],
+ 'tmpfilename': ctx['tmpfilename'],
+ }
+
+ ctx['started'] = time.time()
+ progress = ProgressCalculator(resume_len)
+
+ def frag_progress_hook(s):
+ if s['status'] not in ('downloading', 'finished'):
+ return
+
+ if not total_frags and ctx.get('fragment_count'):
+ state['fragment_count'] = ctx['fragment_count']
+
+ if ctx_id is not None and s.get('ctx_id') != ctx_id:
+ return
+
+ state['max_progress'] = ctx.get('max_progress')
+ state['progress_idx'] = ctx.get('progress_idx')
+
+ state['elapsed'] = progress.elapsed
+ frag_total_bytes = s.get('total_bytes') or 0
+ s['fragment_info_dict'] = s.pop('info_dict', {})
+
+ # XXX: Fragment resume is not accounted for here
+ if not ctx['live']:
+ estimated_size = (
+ (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
+ / (state['fragment_index'] + 1) * total_frags)
+ progress.total = estimated_size
+ progress.update(s.get('downloaded_bytes'))
+ state['total_bytes_estimate'] = progress.total
+ else:
+ progress.update(s.get('downloaded_bytes'))
+
+ if s['status'] == 'finished':
+ state['fragment_index'] += 1
+ ctx['fragment_index'] = state['fragment_index']
+ progress.thread_reset()
+
+ state['downloaded_bytes'] = ctx['complete_frags_downloaded_bytes'] = progress.downloaded
+ state['speed'] = ctx['speed'] = progress.speed.smooth
+ state['eta'] = progress.eta.smooth
+
+ self._hook_progress(state, info_dict)
+
+ ctx['dl'].add_progress_hook(frag_progress_hook)
+
+ return ctx['started']
+
+ def _finish_frag_download(self, ctx, info_dict):
+ ctx['dest_stream'].close()
+ if self.__do_ytdl_file(ctx):
+ self.try_remove(self.ytdl_filename(ctx['filename']))
+ elapsed = time.time() - ctx['started']
+
+ to_file = ctx['tmpfilename'] != '-'
+ if to_file:
+ downloaded_bytes = self.filesize_or_none(ctx['tmpfilename'])
+ else:
+ downloaded_bytes = ctx['complete_frags_downloaded_bytes']
+
+ if not downloaded_bytes:
+ if to_file:
+ self.try_remove(ctx['tmpfilename'])
+ self.report_error('The downloaded file is empty')
+ return False
+ elif to_file:
+ self.try_rename(ctx['tmpfilename'], ctx['filename'])
+ filetime = ctx.get('fragment_filetime')
+ if self.params.get('updatetime', True) and filetime:
+ with contextlib.suppress(Exception):
+ os.utime(ctx['filename'], (time.time(), filetime))
+
+ self._hook_progress({
+ 'downloaded_bytes': downloaded_bytes,
+ 'total_bytes': downloaded_bytes,
+ 'filename': ctx['filename'],
+ 'status': 'finished',
+ 'elapsed': elapsed,
+ 'ctx_id': ctx.get('ctx_id'),
+ 'max_progress': ctx.get('max_progress'),
+ 'progress_idx': ctx.get('progress_idx'),
+ }, info_dict)
+ return True
+
+ def _prepare_external_frag_download(self, ctx):
+ if 'live' not in ctx:
+ ctx['live'] = False
+ if not ctx['live']:
+ total_frags_str = '%d' % ctx['total_frags']
+ ad_frags = ctx.get('ad_frags', 0)
+ if ad_frags:
+ total_frags_str += ' (not including %d ad)' % ad_frags
+ else:
+ total_frags_str = 'unknown (live)'
+ self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
+
+ tmpfilename = self.temp_name(ctx['filename'])
+
+ # Should be initialized before ytdl file check
+ ctx.update({
+ 'tmpfilename': tmpfilename,
+ 'fragment_index': 0,
+ })
+
+ def decrypter(self, info_dict):
+ _key_cache = {}
+
+ def _get_key(url):
+ if url not in _key_cache:
+ _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
+ return _key_cache[url]
+
+ def decrypt_fragment(fragment, frag_content):
+ if frag_content is None:
+ return
+ decrypt_info = fragment.get('decrypt_info')
+ if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
+ return frag_content
+ iv = decrypt_info.get('IV') or struct.pack('>8xq', fragment['media_sequence'])
+ decrypt_info['KEY'] = (decrypt_info.get('KEY')
+ or _get_key(traverse_obj(info_dict, ('hls_aes', 'uri')) or decrypt_info['URI']))
+ # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
+ # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
+ # not what it decrypts to.
+ if self.params.get('test', False):
+ return frag_content
+ return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv))
+
+ return decrypt_fragment
+
+ def download_and_append_fragments_multiple(self, *args, **kwargs):
+ '''
+ @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
+ all args must be either tuple or list
+ '''
+ interrupt_trigger = [True]
+ max_progress = len(args)
+ if max_progress == 1:
+ return self.download_and_append_fragments(*args[0], **kwargs)
+ max_workers = self.params.get('concurrent_fragment_downloads', 1)
+ if max_progress > 1:
+ self._prepare_multiline_status(max_progress)
+ is_live = any(traverse_obj(args, (..., 2, 'is_live')))
+
+ def thread_func(idx, ctx, fragments, info_dict, tpe):
+ ctx['max_progress'] = max_progress
+ ctx['progress_idx'] = idx
+ return self.download_and_append_fragments(
+ ctx, fragments, info_dict, **kwargs, tpe=tpe, interrupt_trigger=interrupt_trigger)
+
+ class FTPE(concurrent.futures.ThreadPoolExecutor):
+ # has to stop this or it's going to wait on the worker thread itself
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ if compat_os_name == 'nt':
+ def future_result(future):
+ while True:
+ try:
+ return future.result(0.1)
+ except KeyboardInterrupt:
+ raise
+ except concurrent.futures.TimeoutError:
+ continue
+ else:
+ def future_result(future):
+ return future.result()
+
+ def interrupt_trigger_iter(fg):
+ for f in fg:
+ if not interrupt_trigger[0]:
+ break
+ yield f
+
+ spins = []
+ for idx, (ctx, fragments, info_dict) in enumerate(args):
+ tpe = FTPE(math.ceil(max_workers / max_progress))
+ job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe)
+ spins.append((tpe, job))
+
+ result = True
+ for tpe, job in spins:
+ try:
+ result = result and future_result(job)
+ except KeyboardInterrupt:
+ interrupt_trigger[0] = False
+ finally:
+ tpe.shutdown(wait=True)
+ if not interrupt_trigger[0] and not is_live:
+ raise KeyboardInterrupt()
+ # we expect the user wants to stop and DO WANT the preceding postprocessors to run;
+ # so returning a intermediate result here instead of KeyboardInterrupt on live
+ return result
+
+ def download_and_append_fragments(
+ self, ctx, fragments, info_dict, *, is_fatal=(lambda idx: False),
+ pack_func=(lambda content, idx: content), finish_func=None,
+ tpe=None, interrupt_trigger=(True, )):
+
+ if not self.params.get('skip_unavailable_fragments', True):
+ is_fatal = lambda _: True
+
+ def download_fragment(fragment, ctx):
+ if not interrupt_trigger[0]:
+ return
+
+ frag_index = ctx['fragment_index'] = fragment['frag_index']
+ ctx['last_error'] = None
+ headers = HTTPHeaderDict(info_dict.get('http_headers'))
+ byte_range = fragment.get('byte_range')
+ if byte_range:
+ headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
+
+ # Never skip the first fragment
+ fatal = is_fatal(fragment.get('index') or (frag_index - 1))
+
+ def error_callback(err, count, retries):
+ if fatal and count > retries:
+ ctx['dest_stream'].close()
+ self.report_retry(err, count, retries, frag_index, fatal)
+ ctx['last_error'] = err
+
+ for retry in RetryManager(self.params.get('fragment_retries'), error_callback):
+ try:
+ ctx['fragment_count'] = fragment.get('fragment_count')
+ if not self._download_fragment(
+ ctx, fragment['url'], info_dict, headers, info_dict.get('request_data')):
+ return
+ except (HTTPError, IncompleteRead) as err:
+ retry.error = err
+ continue
+ except DownloadError: # has own retry settings
+ if fatal:
+ raise
+
+ def append_fragment(frag_content, frag_index, ctx):
+ if frag_content:
+ self._append_fragment(ctx, pack_func(frag_content, frag_index))
+ elif not is_fatal(frag_index - 1):
+ self.report_skip_fragment(frag_index, 'fragment not found')
+ else:
+ ctx['dest_stream'].close()
+ self.report_error(f'fragment {frag_index} not found, unable to continue')
+ return False
+ return True
+
+ decrypt_fragment = self.decrypter(info_dict)
+
+ max_workers = math.ceil(
+ self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
+ if max_workers > 1:
+ def _download_fragment(fragment):
+ ctx_copy = ctx.copy()
+ download_fragment(fragment, ctx_copy)
+ return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized')
+
+ with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
+ try:
+ for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments):
+ ctx.update({
+ 'fragment_filename_sanitized': frag_filename,
+ 'fragment_index': frag_index,
+ })
+ if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx):
+ return False
+ except KeyboardInterrupt:
+ self._finish_multiline_status()
+ self.report_error(
+ 'Interrupted by user. Waiting for all threads to shutdown...', is_error=False, tb=False)
+ pool.shutdown(wait=False)
+ raise
+ else:
+ for fragment in fragments:
+ if not interrupt_trigger[0]:
+ break
+ try:
+ download_fragment(fragment, ctx)
+ result = append_fragment(
+ decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx)
+ except KeyboardInterrupt:
+ if info_dict.get('is_live'):
+ break
+ raise
+ if not result:
+ return False
+
+ if finish_func is not None:
+ ctx['dest_stream'].write(finish_func())
+ ctx['dest_stream'].flush()
+ return self._finish_frag_download(ctx, info_dict)
diff --git a/yt_dlp/downloader/hls.py b/yt_dlp/downloader/hls.py
new file mode 100644
index 0000000..4ac5d99
--- /dev/null
+++ b/yt_dlp/downloader/hls.py
@@ -0,0 +1,378 @@
+import binascii
+import io
+import re
+import urllib.parse
+
+from . import get_suitable_downloader
+from .external import FFmpegFD
+from .fragment import FragmentFD
+from .. import webvtt
+from ..dependencies import Cryptodome
+from ..utils import (
+ bug_reports_message,
+ parse_m3u8_attributes,
+ remove_start,
+ traverse_obj,
+ update_url_query,
+ urljoin,
+)
+
+
+class HlsFD(FragmentFD):
+ """
+ Download segments in a m3u8 manifest. External downloaders can take over
+ the fragment downloads by supporting the 'm3u8_frag_urls' protocol and
+ re-defining 'supports_manifest' function
+ """
+
+ FD_NAME = 'hlsnative'
+
+ @staticmethod
+ def _has_drm(manifest): # TODO: https://github.com/yt-dlp/yt-dlp/pull/5039
+ return bool(re.search('|'.join((
+ r'#EXT-X-(?:SESSION-)?KEY:.*?URI="skd://', # Apple FairPlay
+ r'#EXT-X-(?:SESSION-)?KEY:.*?KEYFORMAT="com\.apple\.streamingkeydelivery"', # Apple FairPlay
+ r'#EXT-X-(?:SESSION-)?KEY:.*?KEYFORMAT="com\.microsoft\.playready"', # Microsoft PlayReady
+ r'#EXT-X-FAXS-CM:', # Adobe Flash Access
+ )), manifest))
+
+ @classmethod
+ def can_download(cls, manifest, info_dict, allow_unplayable_formats=False):
+ UNSUPPORTED_FEATURES = [
+ # r'#EXT-X-BYTERANGE', # playlists composed of byte ranges of media files [2]
+
+ # Live streams heuristic does not always work (e.g. geo restricted to Germany
+ # http://hls-geo.daserste.de/i/videoportal/Film/c_620000/622873/format,716451,716457,716450,716458,716459,.mp4.csmil/index_4_av.m3u8?null=0)
+ # r'#EXT-X-MEDIA-SEQUENCE:(?!0$)', # live streams [3]
+
+ # This heuristic also is not correct since segments may not be appended as well.
+ # Twitch vods of finished streams have EXT-X-PLAYLIST-TYPE:EVENT despite
+ # no segments will definitely be appended to the end of the playlist.
+ # r'#EXT-X-PLAYLIST-TYPE:EVENT', # media segments may be appended to the end of
+ # # event media playlists [4]
+ # r'#EXT-X-MAP:', # media initialization [5]
+ # 1. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.4
+ # 2. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.2
+ # 3. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.3.2
+ # 4. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.3.5
+ # 5. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.5
+ ]
+ if not allow_unplayable_formats:
+ UNSUPPORTED_FEATURES += [
+ r'#EXT-X-KEY:METHOD=(?!NONE|AES-128)', # encrypted streams [1], but not necessarily DRM
+ ]
+
+ def check_results():
+ yield not info_dict.get('is_live')
+ for feature in UNSUPPORTED_FEATURES:
+ yield not re.search(feature, manifest)
+ if not allow_unplayable_formats:
+ yield not cls._has_drm(manifest)
+ return all(check_results())
+
+ def real_download(self, filename, info_dict):
+ man_url = info_dict['url']
+ self.to_screen('[%s] Downloading m3u8 manifest' % self.FD_NAME)
+
+ urlh = self.ydl.urlopen(self._prepare_url(info_dict, man_url))
+ man_url = urlh.url
+ s = urlh.read().decode('utf-8', 'ignore')
+
+ can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
+ if can_download:
+ has_ffmpeg = FFmpegFD.available()
+ no_crypto = not Cryptodome.AES and '#EXT-X-KEY:METHOD=AES-128' in s
+ if no_crypto and has_ffmpeg:
+ can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
+ elif no_crypto:
+ message = ('The stream has AES-128 encryption and neither ffmpeg nor pycryptodomex are available; '
+ 'Decryption will be performed natively, but will be extremely slow')
+ elif info_dict.get('extractor_key') == 'Generic' and re.search(r'(?m)#EXT-X-MEDIA-SEQUENCE:(?!0$)', s):
+ install_ffmpeg = '' if has_ffmpeg else 'install ffmpeg and '
+ message = ('Live HLS streams are not supported by the native downloader. If this is a livestream, '
+ f'please {install_ffmpeg}add "--downloader ffmpeg --hls-use-mpegts" to your command')
+ if not can_download:
+ if self._has_drm(s) and not self.params.get('allow_unplayable_formats'):
+ if info_dict.get('has_drm') and self.params.get('test'):
+ self.to_screen(f'[{self.FD_NAME}] This format is DRM protected', skip_eol=True)
+ else:
+ self.report_error(
+ 'This format is DRM protected; Try selecting another format with --format or '
+ 'add --check-formats to automatically fallback to the next best format', tb=False)
+ return False
+ message = message or 'Unsupported features have been detected'
+ fd = FFmpegFD(self.ydl, self.params)
+ self.report_warning(f'{message}; extraction will be delegated to {fd.get_basename()}')
+ return fd.real_download(filename, info_dict)
+ elif message:
+ self.report_warning(message)
+
+ is_webvtt = info_dict['ext'] == 'vtt'
+ if is_webvtt:
+ real_downloader = None # Packing the fragments is not currently supported for external downloader
+ else:
+ real_downloader = get_suitable_downloader(
+ info_dict, self.params, None, protocol='m3u8_frag_urls', to_stdout=(filename == '-'))
+ if real_downloader and not real_downloader.supports_manifest(s):
+ real_downloader = None
+ if real_downloader:
+ self.to_screen(f'[{self.FD_NAME}] Fragment downloads will be delegated to {real_downloader.get_basename()}')
+
+ def is_ad_fragment_start(s):
+ return (s.startswith('#ANVATO-SEGMENT-INFO') and 'type=ad' in s
+ or s.startswith('#UPLYNK-SEGMENT') and s.endswith(',ad'))
+
+ def is_ad_fragment_end(s):
+ return (s.startswith('#ANVATO-SEGMENT-INFO') and 'type=master' in s
+ or s.startswith('#UPLYNK-SEGMENT') and s.endswith(',segment'))
+
+ fragments = []
+
+ media_frags = 0
+ ad_frags = 0
+ ad_frag_next = False
+ for line in s.splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith('#'):
+ if is_ad_fragment_start(line):
+ ad_frag_next = True
+ elif is_ad_fragment_end(line):
+ ad_frag_next = False
+ continue
+ if ad_frag_next:
+ ad_frags += 1
+ continue
+ media_frags += 1
+
+ ctx = {
+ 'filename': filename,
+ 'total_frags': media_frags,
+ 'ad_frags': ad_frags,
+ }
+
+ if real_downloader:
+ self._prepare_external_frag_download(ctx)
+ else:
+ self._prepare_and_start_frag_download(ctx, info_dict)
+
+ extra_state = ctx.setdefault('extra_state', {})
+
+ format_index = info_dict.get('format_index')
+ extra_query = None
+ extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url')
+ if extra_param_to_segment_url:
+ extra_query = urllib.parse.parse_qs(extra_param_to_segment_url)
+ i = 0
+ media_sequence = 0
+ decrypt_info = {'METHOD': 'NONE'}
+ external_aes_key = traverse_obj(info_dict, ('hls_aes', 'key'))
+ if external_aes_key:
+ external_aes_key = binascii.unhexlify(remove_start(external_aes_key, '0x'))
+ assert len(external_aes_key) in (16, 24, 32), 'Invalid length for HLS AES-128 key'
+ external_aes_iv = traverse_obj(info_dict, ('hls_aes', 'iv'))
+ if external_aes_iv:
+ external_aes_iv = binascii.unhexlify(remove_start(external_aes_iv, '0x').zfill(32))
+ byte_range = {}
+ discontinuity_count = 0
+ frag_index = 0
+ ad_frag_next = False
+ for line in s.splitlines():
+ line = line.strip()
+ if line:
+ if not line.startswith('#'):
+ if format_index and discontinuity_count != format_index:
+ continue
+ if ad_frag_next:
+ continue
+ frag_index += 1
+ if frag_index <= ctx['fragment_index']:
+ continue
+ frag_url = urljoin(man_url, line)
+ if extra_query:
+ frag_url = update_url_query(frag_url, extra_query)
+
+ fragments.append({
+ 'frag_index': frag_index,
+ 'url': frag_url,
+ 'decrypt_info': decrypt_info,
+ 'byte_range': byte_range,
+ 'media_sequence': media_sequence,
+ })
+ media_sequence += 1
+
+ elif line.startswith('#EXT-X-MAP'):
+ if format_index and discontinuity_count != format_index:
+ continue
+ if frag_index > 0:
+ self.report_error(
+ 'Initialization fragment found after media fragments, unable to download')
+ return False
+ frag_index += 1
+ map_info = parse_m3u8_attributes(line[11:])
+ frag_url = urljoin(man_url, map_info.get('URI'))
+ if extra_query:
+ frag_url = update_url_query(frag_url, extra_query)
+
+ if map_info.get('BYTERANGE'):
+ splitted_byte_range = map_info.get('BYTERANGE').split('@')
+ sub_range_start = int(splitted_byte_range[1]) if len(splitted_byte_range) == 2 else byte_range['end']
+ byte_range = {
+ 'start': sub_range_start,
+ 'end': sub_range_start + int(splitted_byte_range[0]),
+ }
+
+ fragments.append({
+ 'frag_index': frag_index,
+ 'url': frag_url,
+ 'decrypt_info': decrypt_info,
+ 'byte_range': byte_range,
+ 'media_sequence': media_sequence
+ })
+ media_sequence += 1
+
+ elif line.startswith('#EXT-X-KEY'):
+ decrypt_url = decrypt_info.get('URI')
+ decrypt_info = parse_m3u8_attributes(line[11:])
+ if decrypt_info['METHOD'] == 'AES-128':
+ if external_aes_iv:
+ decrypt_info['IV'] = external_aes_iv
+ elif 'IV' in decrypt_info:
+ decrypt_info['IV'] = binascii.unhexlify(decrypt_info['IV'][2:].zfill(32))
+ if external_aes_key:
+ decrypt_info['KEY'] = external_aes_key
+ else:
+ decrypt_info['URI'] = urljoin(man_url, decrypt_info['URI'])
+ if extra_query:
+ decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query)
+ if decrypt_url != decrypt_info['URI']:
+ decrypt_info['KEY'] = None
+
+ elif line.startswith('#EXT-X-MEDIA-SEQUENCE'):
+ media_sequence = int(line[22:])
+ elif line.startswith('#EXT-X-BYTERANGE'):
+ splitted_byte_range = line[17:].split('@')
+ sub_range_start = int(splitted_byte_range[1]) if len(splitted_byte_range) == 2 else byte_range['end']
+ byte_range = {
+ 'start': sub_range_start,
+ 'end': sub_range_start + int(splitted_byte_range[0]),
+ }
+ elif is_ad_fragment_start(line):
+ ad_frag_next = True
+ elif is_ad_fragment_end(line):
+ ad_frag_next = False
+ elif line.startswith('#EXT-X-DISCONTINUITY'):
+ discontinuity_count += 1
+ i += 1
+
+ # We only download the first fragment during the test
+ if self.params.get('test', False):
+ fragments = [fragments[0] if fragments else None]
+
+ if real_downloader:
+ info_dict['fragments'] = fragments
+ fd = real_downloader(self.ydl, self.params)
+ # TODO: Make progress updates work without hooking twice
+ # for ph in self._progress_hooks:
+ # fd.add_progress_hook(ph)
+ return fd.real_download(filename, info_dict)
+
+ if is_webvtt:
+ def pack_fragment(frag_content, frag_index):
+ output = io.StringIO()
+ adjust = 0
+ overflow = False
+ mpegts_last = None
+ for block in webvtt.parse_fragment(frag_content):
+ if isinstance(block, webvtt.CueBlock):
+ extra_state['webvtt_mpegts_last'] = mpegts_last
+ if overflow:
+ extra_state['webvtt_mpegts_adjust'] += 1
+ overflow = False
+ block.start += adjust
+ block.end += adjust
+
+ dedup_window = extra_state.setdefault('webvtt_dedup_window', [])
+
+ ready = []
+
+ i = 0
+ is_new = True
+ while i < len(dedup_window):
+ wcue = dedup_window[i]
+ wblock = webvtt.CueBlock.from_json(wcue)
+ i += 1
+ if wblock.hinges(block):
+ wcue['end'] = block.end
+ is_new = False
+ continue
+ if wblock == block:
+ is_new = False
+ continue
+ if wblock.end > block.start:
+ continue
+ ready.append(wblock)
+ i -= 1
+ del dedup_window[i]
+
+ if is_new:
+ dedup_window.append(block.as_json)
+ for block in ready:
+ block.write_into(output)
+
+ # we only emit cues once they fall out of the duplicate window
+ continue
+ elif isinstance(block, webvtt.Magic):
+ # take care of MPEG PES timestamp overflow
+ if block.mpegts is None:
+ block.mpegts = 0
+ extra_state.setdefault('webvtt_mpegts_adjust', 0)
+ block.mpegts += extra_state['webvtt_mpegts_adjust'] << 33
+ if block.mpegts < extra_state.get('webvtt_mpegts_last', 0):
+ overflow = True
+ block.mpegts += 1 << 33
+ mpegts_last = block.mpegts
+
+ if frag_index == 1:
+ extra_state['webvtt_mpegts'] = block.mpegts or 0
+ extra_state['webvtt_local'] = block.local or 0
+ # XXX: block.local = block.mpegts = None ?
+ else:
+ if block.mpegts is not None and block.local is not None:
+ adjust = (
+ (block.mpegts - extra_state.get('webvtt_mpegts', 0))
+ - (block.local - extra_state.get('webvtt_local', 0))
+ )
+ continue
+ elif isinstance(block, webvtt.HeaderBlock):
+ if frag_index != 1:
+ # XXX: this should probably be silent as well
+ # or verify that all segments contain the same data
+ self.report_warning(bug_reports_message(
+ 'Discarding a %s block found in the middle of the stream; '
+ 'if the subtitles display incorrectly,'
+ % (type(block).__name__)))
+ continue
+ block.write_into(output)
+
+ return output.getvalue().encode()
+
+ def fin_fragments():
+ dedup_window = extra_state.get('webvtt_dedup_window')
+ if not dedup_window:
+ return b''
+
+ output = io.StringIO()
+ for cue in dedup_window:
+ webvtt.CueBlock.from_json(cue).write_into(output)
+
+ return output.getvalue().encode()
+
+ if len(fragments) == 1:
+ self.download_and_append_fragments(ctx, fragments, info_dict)
+ else:
+ self.download_and_append_fragments(
+ ctx, fragments, info_dict, pack_func=pack_fragment, finish_func=fin_fragments)
+ else:
+ return self.download_and_append_fragments(ctx, fragments, info_dict)
diff --git a/yt_dlp/downloader/http.py b/yt_dlp/downloader/http.py
new file mode 100644
index 0000000..693828b
--- /dev/null
+++ b/yt_dlp/downloader/http.py
@@ -0,0 +1,383 @@
+import os
+import random
+import time
+
+from .common import FileDownloader
+from ..networking import Request
+from ..networking.exceptions import (
+ CertificateVerifyError,
+ HTTPError,
+ TransportError,
+)
+from ..utils import (
+ ContentTooShortError,
+ RetryManager,
+ ThrottledDownload,
+ XAttrMetadataError,
+ XAttrUnavailableError,
+ encodeFilename,
+ int_or_none,
+ parse_http_range,
+ try_call,
+ write_xattr,
+)
+from ..utils.networking import HTTPHeaderDict
+
+
+class HttpFD(FileDownloader):
+ def real_download(self, filename, info_dict):
+ url = info_dict['url']
+ request_data = info_dict.get('request_data', None)
+
+ class DownloadContext(dict):
+ __getattr__ = dict.get
+ __setattr__ = dict.__setitem__
+ __delattr__ = dict.__delitem__
+
+ ctx = DownloadContext()
+ ctx.filename = filename
+ ctx.tmpfilename = self.temp_name(filename)
+ ctx.stream = None
+
+ # Disable compression
+ headers = HTTPHeaderDict({'Accept-Encoding': 'identity'}, info_dict.get('http_headers'))
+
+ is_test = self.params.get('test', False)
+ chunk_size = self._TEST_FILE_SIZE if is_test else (
+ self.params.get('http_chunk_size')
+ or info_dict.get('downloader_options', {}).get('http_chunk_size')
+ or 0)
+
+ ctx.open_mode = 'wb'
+ ctx.resume_len = 0
+ ctx.block_size = self.params.get('buffersize', 1024)
+ ctx.start_time = time.time()
+
+ # parse given Range
+ req_start, req_end, _ = parse_http_range(headers.get('Range'))
+
+ if self.params.get('continuedl', True):
+ # Establish possible resume length
+ if os.path.isfile(encodeFilename(ctx.tmpfilename)):
+ ctx.resume_len = os.path.getsize(
+ encodeFilename(ctx.tmpfilename))
+
+ ctx.is_resume = ctx.resume_len > 0
+
+ class SucceedDownload(Exception):
+ pass
+
+ class RetryDownload(Exception):
+ def __init__(self, source_error):
+ self.source_error = source_error
+
+ class NextFragment(Exception):
+ pass
+
+ def establish_connection():
+ ctx.chunk_size = (random.randint(int(chunk_size * 0.95), chunk_size)
+ if not is_test and chunk_size else chunk_size)
+ if ctx.resume_len > 0:
+ range_start = ctx.resume_len
+ if req_start is not None:
+ # offset the beginning of Range to be within request
+ range_start += req_start
+ if ctx.is_resume:
+ self.report_resuming_byte(ctx.resume_len)
+ ctx.open_mode = 'ab'
+ elif req_start is not None:
+ range_start = req_start
+ elif ctx.chunk_size > 0:
+ range_start = 0
+ else:
+ range_start = None
+ ctx.is_resume = False
+
+ if ctx.chunk_size:
+ chunk_aware_end = range_start + ctx.chunk_size - 1
+ # we're not allowed to download outside Range
+ range_end = chunk_aware_end if req_end is None else min(chunk_aware_end, req_end)
+ elif req_end is not None:
+ # there's no need for chunked downloads, so download until the end of Range
+ range_end = req_end
+ else:
+ range_end = None
+
+ if try_call(lambda: range_start > range_end):
+ ctx.resume_len = 0
+ ctx.open_mode = 'wb'
+ raise RetryDownload(Exception(f'Conflicting range. (start={range_start} > end={range_end})'))
+
+ if try_call(lambda: range_end >= ctx.content_len):
+ range_end = ctx.content_len - 1
+
+ request = Request(url, request_data, headers)
+ has_range = range_start is not None
+ if has_range:
+ request.headers['Range'] = f'bytes={int(range_start)}-{int_or_none(range_end) or ""}'
+ # Establish connection
+ try:
+ ctx.data = self.ydl.urlopen(request)
+ # When trying to resume, Content-Range HTTP header of response has to be checked
+ # to match the value of requested Range HTTP header. This is due to a webservers
+ # that don't support resuming and serve a whole file with no Content-Range
+ # set in response despite of requested Range (see
+ # https://github.com/ytdl-org/youtube-dl/issues/6057#issuecomment-126129799)
+ if has_range:
+ content_range = ctx.data.headers.get('Content-Range')
+ content_range_start, content_range_end, content_len = parse_http_range(content_range)
+ # Content-Range is present and matches requested Range, resume is possible
+ if range_start == content_range_start and (
+ # Non-chunked download
+ not ctx.chunk_size
+ # Chunked download and requested piece or
+ # its part is promised to be served
+ or content_range_end == range_end
+ or content_len < range_end):
+ ctx.content_len = content_len
+ if content_len or req_end:
+ ctx.data_len = min(content_len or req_end, req_end or content_len) - (req_start or 0)
+ return
+ # Content-Range is either not present or invalid. Assuming remote webserver is
+ # trying to send the whole file, resume is not possible, so wiping the local file
+ # and performing entire redownload
+ elif range_start > 0:
+ self.report_unable_to_resume()
+ ctx.resume_len = 0
+ ctx.open_mode = 'wb'
+ ctx.data_len = ctx.content_len = int_or_none(ctx.data.headers.get('Content-length', None))
+ except HTTPError as err:
+ if err.status == 416:
+ # Unable to resume (requested range not satisfiable)
+ try:
+ # Open the connection again without the range header
+ ctx.data = self.ydl.urlopen(
+ Request(url, request_data, headers))
+ content_length = ctx.data.headers['Content-Length']
+ except HTTPError as err:
+ if err.status < 500 or err.status >= 600:
+ raise
+ else:
+ # Examine the reported length
+ if (content_length is not None
+ and (ctx.resume_len - 100 < int(content_length) < ctx.resume_len + 100)):
+ # The file had already been fully downloaded.
+ # Explanation to the above condition: in issue #175 it was revealed that
+ # YouTube sometimes adds or removes a few bytes from the end of the file,
+ # changing the file size slightly and causing problems for some users. So
+ # I decided to implement a suggested change and consider the file
+ # completely downloaded if the file size differs less than 100 bytes from
+ # the one in the hard drive.
+ self.report_file_already_downloaded(ctx.filename)
+ self.try_rename(ctx.tmpfilename, ctx.filename)
+ self._hook_progress({
+ 'filename': ctx.filename,
+ 'status': 'finished',
+ 'downloaded_bytes': ctx.resume_len,
+ 'total_bytes': ctx.resume_len,
+ }, info_dict)
+ raise SucceedDownload()
+ else:
+ # The length does not match, we start the download over
+ self.report_unable_to_resume()
+ ctx.resume_len = 0
+ ctx.open_mode = 'wb'
+ return
+ elif err.status < 500 or err.status >= 600:
+ # Unexpected HTTP error
+ raise
+ raise RetryDownload(err)
+ except CertificateVerifyError:
+ raise
+ except TransportError as err:
+ raise RetryDownload(err)
+
+ def close_stream():
+ if ctx.stream is not None:
+ if not ctx.tmpfilename == '-':
+ ctx.stream.close()
+ ctx.stream = None
+
+ def download():
+ data_len = ctx.data.headers.get('Content-length')
+
+ if ctx.data.headers.get('Content-encoding'):
+ # Content-encoding is present, Content-length is not reliable anymore as we are
+ # doing auto decompression. (See: https://github.com/yt-dlp/yt-dlp/pull/6176)
+ data_len = None
+
+ # Range HTTP header may be ignored/unsupported by a webserver
+ # (e.g. extractor/scivee.py, extractor/bambuser.py).
+ # However, for a test we still would like to download just a piece of a file.
+ # To achieve this we limit data_len to _TEST_FILE_SIZE and manually control
+ # block size when downloading a file.
+ if is_test and (data_len is None or int(data_len) > self._TEST_FILE_SIZE):
+ data_len = self._TEST_FILE_SIZE
+
+ if data_len is not None:
+ data_len = int(data_len) + ctx.resume_len
+ min_data_len = self.params.get('min_filesize')
+ max_data_len = self.params.get('max_filesize')
+ if min_data_len is not None and data_len < min_data_len:
+ self.to_screen(
+ f'\r[download] File is smaller than min-filesize ({data_len} bytes < {min_data_len} bytes). Aborting.')
+ return False
+ if max_data_len is not None and data_len > max_data_len:
+ self.to_screen(
+ f'\r[download] File is larger than max-filesize ({data_len} bytes > {max_data_len} bytes). Aborting.')
+ return False
+
+ byte_counter = 0 + ctx.resume_len
+ block_size = ctx.block_size
+ start = time.time()
+
+ # measure time over whole while-loop, so slow_down() and best_block_size() work together properly
+ now = None # needed for slow_down() in the first loop run
+ before = start # start measuring
+
+ def retry(e):
+ close_stream()
+ if ctx.tmpfilename == '-':
+ ctx.resume_len = byte_counter
+ else:
+ try:
+ ctx.resume_len = os.path.getsize(encodeFilename(ctx.tmpfilename))
+ except FileNotFoundError:
+ ctx.resume_len = 0
+ raise RetryDownload(e)
+
+ while True:
+ try:
+ # Download and write
+ data_block = ctx.data.read(block_size if not is_test else min(block_size, data_len - byte_counter))
+ except TransportError as err:
+ retry(err)
+
+ byte_counter += len(data_block)
+
+ # exit loop when download is finished
+ if len(data_block) == 0:
+ break
+
+ # Open destination file just in time
+ if ctx.stream is None:
+ try:
+ ctx.stream, ctx.tmpfilename = self.sanitize_open(
+ ctx.tmpfilename, ctx.open_mode)
+ assert ctx.stream is not None
+ ctx.filename = self.undo_temp_name(ctx.tmpfilename)
+ self.report_destination(ctx.filename)
+ except OSError as err:
+ self.report_error('unable to open for writing: %s' % str(err))
+ return False
+
+ if self.params.get('xattr_set_filesize', False) and data_len is not None:
+ try:
+ write_xattr(ctx.tmpfilename, 'user.ytdl.filesize', str(data_len).encode())
+ except (XAttrUnavailableError, XAttrMetadataError) as err:
+ self.report_error('unable to set filesize xattr: %s' % str(err))
+
+ try:
+ ctx.stream.write(data_block)
+ except OSError as err:
+ self.to_stderr('\n')
+ self.report_error('unable to write data: %s' % str(err))
+ return False
+
+ # Apply rate limit
+ self.slow_down(start, now, byte_counter - ctx.resume_len)
+
+ # end measuring of one loop run
+ now = time.time()
+ after = now
+
+ # Adjust block size
+ if not self.params.get('noresizebuffer', False):
+ block_size = self.best_block_size(after - before, len(data_block))
+
+ before = after
+
+ # Progress message
+ speed = self.calc_speed(start, now, byte_counter - ctx.resume_len)
+ if ctx.data_len is None:
+ eta = None
+ else:
+ eta = self.calc_eta(start, time.time(), ctx.data_len - ctx.resume_len, byte_counter - ctx.resume_len)
+
+ self._hook_progress({
+ 'status': 'downloading',
+ 'downloaded_bytes': byte_counter,
+ 'total_bytes': ctx.data_len,
+ 'tmpfilename': ctx.tmpfilename,
+ 'filename': ctx.filename,
+ 'eta': eta,
+ 'speed': speed,
+ 'elapsed': now - ctx.start_time,
+ 'ctx_id': info_dict.get('ctx_id'),
+ }, info_dict)
+
+ if data_len is not None and byte_counter == data_len:
+ break
+
+ if speed and speed < (self.params.get('throttledratelimit') or 0):
+ # The speed must stay below the limit for 3 seconds
+ # This prevents raising error when the speed temporarily goes down
+ if ctx.throttle_start is None:
+ ctx.throttle_start = now
+ elif now - ctx.throttle_start > 3:
+ if ctx.stream is not None and ctx.tmpfilename != '-':
+ ctx.stream.close()
+ raise ThrottledDownload()
+ elif speed:
+ ctx.throttle_start = None
+
+ if ctx.stream is None:
+ self.to_stderr('\n')
+ self.report_error('Did not get any data blocks')
+ return False
+
+ if not is_test and ctx.chunk_size and ctx.content_len is not None and byte_counter < ctx.content_len:
+ ctx.resume_len = byte_counter
+ raise NextFragment()
+
+ if ctx.tmpfilename != '-':
+ ctx.stream.close()
+
+ if data_len is not None and byte_counter != data_len:
+ err = ContentTooShortError(byte_counter, int(data_len))
+ retry(err)
+
+ self.try_rename(ctx.tmpfilename, ctx.filename)
+
+ # Update file modification time
+ if self.params.get('updatetime', True):
+ info_dict['filetime'] = self.try_utime(ctx.filename, ctx.data.headers.get('last-modified', None))
+
+ self._hook_progress({
+ 'downloaded_bytes': byte_counter,
+ 'total_bytes': byte_counter,
+ 'filename': ctx.filename,
+ 'status': 'finished',
+ 'elapsed': time.time() - ctx.start_time,
+ 'ctx_id': info_dict.get('ctx_id'),
+ }, info_dict)
+
+ return True
+
+ for retry in RetryManager(self.params.get('retries'), self.report_retry):
+ try:
+ establish_connection()
+ return download()
+ except RetryDownload as err:
+ retry.error = err.source_error
+ continue
+ except NextFragment:
+ retry.error = None
+ retry.attempt -= 1
+ continue
+ except SucceedDownload:
+ return True
+ except: # noqa: E722
+ close_stream()
+ raise
+ return False
diff --git a/yt_dlp/downloader/ism.py b/yt_dlp/downloader/ism.py
new file mode 100644
index 0000000..dd688f5
--- /dev/null
+++ b/yt_dlp/downloader/ism.py
@@ -0,0 +1,283 @@
+import binascii
+import io
+import struct
+import time
+
+from .fragment import FragmentFD
+from ..networking.exceptions import HTTPError
+from ..utils import RetryManager
+
+u8 = struct.Struct('>B')
+u88 = struct.Struct('>Bx')
+u16 = struct.Struct('>H')
+u1616 = struct.Struct('>Hxx')
+u32 = struct.Struct('>I')
+u64 = struct.Struct('>Q')
+
+s88 = struct.Struct('>bx')
+s16 = struct.Struct('>h')
+s1616 = struct.Struct('>hxx')
+s32 = struct.Struct('>i')
+
+unity_matrix = (s32.pack(0x10000) + s32.pack(0) * 3) * 2 + s32.pack(0x40000000)
+
+TRACK_ENABLED = 0x1
+TRACK_IN_MOVIE = 0x2
+TRACK_IN_PREVIEW = 0x4
+
+SELF_CONTAINED = 0x1
+
+
+def box(box_type, payload):
+ return u32.pack(8 + len(payload)) + box_type + payload
+
+
+def full_box(box_type, version, flags, payload):
+ return box(box_type, u8.pack(version) + u32.pack(flags)[1:] + payload)
+
+
+def write_piff_header(stream, params):
+ track_id = params['track_id']
+ fourcc = params['fourcc']
+ duration = params['duration']
+ timescale = params.get('timescale', 10000000)
+ language = params.get('language', 'und')
+ height = params.get('height', 0)
+ width = params.get('width', 0)
+ stream_type = params['stream_type']
+ creation_time = modification_time = int(time.time())
+
+ ftyp_payload = b'isml' # major brand
+ ftyp_payload += u32.pack(1) # minor version
+ ftyp_payload += b'piff' + b'iso2' # compatible brands
+ stream.write(box(b'ftyp', ftyp_payload)) # File Type Box
+
+ mvhd_payload = u64.pack(creation_time)
+ mvhd_payload += u64.pack(modification_time)
+ mvhd_payload += u32.pack(timescale)
+ mvhd_payload += u64.pack(duration)
+ mvhd_payload += s1616.pack(1) # rate
+ mvhd_payload += s88.pack(1) # volume
+ mvhd_payload += u16.pack(0) # reserved
+ mvhd_payload += u32.pack(0) * 2 # reserved
+ mvhd_payload += unity_matrix
+ mvhd_payload += u32.pack(0) * 6 # pre defined
+ mvhd_payload += u32.pack(0xffffffff) # next track id
+ moov_payload = full_box(b'mvhd', 1, 0, mvhd_payload) # Movie Header Box
+
+ tkhd_payload = u64.pack(creation_time)
+ tkhd_payload += u64.pack(modification_time)
+ tkhd_payload += u32.pack(track_id) # track id
+ tkhd_payload += u32.pack(0) # reserved
+ tkhd_payload += u64.pack(duration)
+ tkhd_payload += u32.pack(0) * 2 # reserved
+ tkhd_payload += s16.pack(0) # layer
+ tkhd_payload += s16.pack(0) # alternate group
+ tkhd_payload += s88.pack(1 if stream_type == 'audio' else 0) # volume
+ tkhd_payload += u16.pack(0) # reserved
+ tkhd_payload += unity_matrix
+ tkhd_payload += u1616.pack(width)
+ tkhd_payload += u1616.pack(height)
+ trak_payload = full_box(b'tkhd', 1, TRACK_ENABLED | TRACK_IN_MOVIE | TRACK_IN_PREVIEW, tkhd_payload) # Track Header Box
+
+ mdhd_payload = u64.pack(creation_time)
+ mdhd_payload += u64.pack(modification_time)
+ mdhd_payload += u32.pack(timescale)
+ mdhd_payload += u64.pack(duration)
+ mdhd_payload += u16.pack(((ord(language[0]) - 0x60) << 10) | ((ord(language[1]) - 0x60) << 5) | (ord(language[2]) - 0x60))
+ mdhd_payload += u16.pack(0) # pre defined
+ mdia_payload = full_box(b'mdhd', 1, 0, mdhd_payload) # Media Header Box
+
+ hdlr_payload = u32.pack(0) # pre defined
+ if stream_type == 'audio': # handler type
+ hdlr_payload += b'soun'
+ hdlr_payload += u32.pack(0) * 3 # reserved
+ hdlr_payload += b'SoundHandler\0' # name
+ elif stream_type == 'video':
+ hdlr_payload += b'vide'
+ hdlr_payload += u32.pack(0) * 3 # reserved
+ hdlr_payload += b'VideoHandler\0' # name
+ elif stream_type == 'text':
+ hdlr_payload += b'subt'
+ hdlr_payload += u32.pack(0) * 3 # reserved
+ hdlr_payload += b'SubtitleHandler\0' # name
+ else:
+ assert False
+ mdia_payload += full_box(b'hdlr', 0, 0, hdlr_payload) # Handler Reference Box
+
+ if stream_type == 'audio':
+ smhd_payload = s88.pack(0) # balance
+ smhd_payload += u16.pack(0) # reserved
+ media_header_box = full_box(b'smhd', 0, 0, smhd_payload) # Sound Media Header
+ elif stream_type == 'video':
+ vmhd_payload = u16.pack(0) # graphics mode
+ vmhd_payload += u16.pack(0) * 3 # opcolor
+ media_header_box = full_box(b'vmhd', 0, 1, vmhd_payload) # Video Media Header
+ elif stream_type == 'text':
+ media_header_box = full_box(b'sthd', 0, 0, b'') # Subtitle Media Header
+ else:
+ assert False
+ minf_payload = media_header_box
+
+ dref_payload = u32.pack(1) # entry count
+ dref_payload += full_box(b'url ', 0, SELF_CONTAINED, b'') # Data Entry URL Box
+ dinf_payload = full_box(b'dref', 0, 0, dref_payload) # Data Reference Box
+ minf_payload += box(b'dinf', dinf_payload) # Data Information Box
+
+ stsd_payload = u32.pack(1) # entry count
+
+ sample_entry_payload = u8.pack(0) * 6 # reserved
+ sample_entry_payload += u16.pack(1) # data reference index
+ if stream_type == 'audio':
+ sample_entry_payload += u32.pack(0) * 2 # reserved
+ sample_entry_payload += u16.pack(params.get('channels', 2))
+ sample_entry_payload += u16.pack(params.get('bits_per_sample', 16))
+ sample_entry_payload += u16.pack(0) # pre defined
+ sample_entry_payload += u16.pack(0) # reserved
+ sample_entry_payload += u1616.pack(params['sampling_rate'])
+
+ if fourcc == 'AACL':
+ sample_entry_box = box(b'mp4a', sample_entry_payload)
+ if fourcc == 'EC-3':
+ sample_entry_box = box(b'ec-3', sample_entry_payload)
+ elif stream_type == 'video':
+ sample_entry_payload += u16.pack(0) # pre defined
+ sample_entry_payload += u16.pack(0) # reserved
+ sample_entry_payload += u32.pack(0) * 3 # pre defined
+ sample_entry_payload += u16.pack(width)
+ sample_entry_payload += u16.pack(height)
+ sample_entry_payload += u1616.pack(0x48) # horiz resolution 72 dpi
+ sample_entry_payload += u1616.pack(0x48) # vert resolution 72 dpi
+ sample_entry_payload += u32.pack(0) # reserved
+ sample_entry_payload += u16.pack(1) # frame count
+ sample_entry_payload += u8.pack(0) * 32 # compressor name
+ sample_entry_payload += u16.pack(0x18) # depth
+ sample_entry_payload += s16.pack(-1) # pre defined
+
+ codec_private_data = binascii.unhexlify(params['codec_private_data'].encode())
+ if fourcc in ('H264', 'AVC1'):
+ sps, pps = codec_private_data.split(u32.pack(1))[1:]
+ avcc_payload = u8.pack(1) # configuration version
+ avcc_payload += sps[1:4] # avc profile indication + profile compatibility + avc level indication
+ avcc_payload += u8.pack(0xfc | (params.get('nal_unit_length_field', 4) - 1)) # complete representation (1) + reserved (11111) + length size minus one
+ avcc_payload += u8.pack(1) # reserved (0) + number of sps (0000001)
+ avcc_payload += u16.pack(len(sps))
+ avcc_payload += sps
+ avcc_payload += u8.pack(1) # number of pps
+ avcc_payload += u16.pack(len(pps))
+ avcc_payload += pps
+ sample_entry_payload += box(b'avcC', avcc_payload) # AVC Decoder Configuration Record
+ sample_entry_box = box(b'avc1', sample_entry_payload) # AVC Simple Entry
+ else:
+ assert False
+ elif stream_type == 'text':
+ if fourcc == 'TTML':
+ sample_entry_payload += b'http://www.w3.org/ns/ttml\0' # namespace
+ sample_entry_payload += b'\0' # schema location
+ sample_entry_payload += b'\0' # auxilary mime types(??)
+ sample_entry_box = box(b'stpp', sample_entry_payload)
+ else:
+ assert False
+ else:
+ assert False
+ stsd_payload += sample_entry_box
+
+ stbl_payload = full_box(b'stsd', 0, 0, stsd_payload) # Sample Description Box
+
+ stts_payload = u32.pack(0) # entry count
+ stbl_payload += full_box(b'stts', 0, 0, stts_payload) # Decoding Time to Sample Box
+
+ stsc_payload = u32.pack(0) # entry count
+ stbl_payload += full_box(b'stsc', 0, 0, stsc_payload) # Sample To Chunk Box
+
+ stco_payload = u32.pack(0) # entry count
+ stbl_payload += full_box(b'stco', 0, 0, stco_payload) # Chunk Offset Box
+
+ minf_payload += box(b'stbl', stbl_payload) # Sample Table Box
+
+ mdia_payload += box(b'minf', minf_payload) # Media Information Box
+
+ trak_payload += box(b'mdia', mdia_payload) # Media Box
+
+ moov_payload += box(b'trak', trak_payload) # Track Box
+
+ mehd_payload = u64.pack(duration)
+ mvex_payload = full_box(b'mehd', 1, 0, mehd_payload) # Movie Extends Header Box
+
+ trex_payload = u32.pack(track_id) # track id
+ trex_payload += u32.pack(1) # default sample description index
+ trex_payload += u32.pack(0) # default sample duration
+ trex_payload += u32.pack(0) # default sample size
+ trex_payload += u32.pack(0) # default sample flags
+ mvex_payload += full_box(b'trex', 0, 0, trex_payload) # Track Extends Box
+
+ moov_payload += box(b'mvex', mvex_payload) # Movie Extends Box
+ stream.write(box(b'moov', moov_payload)) # Movie Box
+
+
+def extract_box_data(data, box_sequence):
+ data_reader = io.BytesIO(data)
+ while True:
+ box_size = u32.unpack(data_reader.read(4))[0]
+ box_type = data_reader.read(4)
+ if box_type == box_sequence[0]:
+ box_data = data_reader.read(box_size - 8)
+ if len(box_sequence) == 1:
+ return box_data
+ return extract_box_data(box_data, box_sequence[1:])
+ data_reader.seek(box_size - 8, 1)
+
+
+class IsmFD(FragmentFD):
+ """
+ Download segments in a ISM manifest
+ """
+
+ def real_download(self, filename, info_dict):
+ segments = info_dict['fragments'][:1] if self.params.get(
+ 'test', False) else info_dict['fragments']
+
+ ctx = {
+ 'filename': filename,
+ 'total_frags': len(segments),
+ }
+
+ self._prepare_and_start_frag_download(ctx, info_dict)
+
+ extra_state = ctx.setdefault('extra_state', {
+ 'ism_track_written': False,
+ })
+
+ skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
+
+ frag_index = 0
+ for i, segment in enumerate(segments):
+ frag_index += 1
+ if frag_index <= ctx['fragment_index']:
+ continue
+
+ retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry,
+ frag_index=frag_index, fatal=not skip_unavailable_fragments)
+ for retry in retry_manager:
+ try:
+ success = self._download_fragment(ctx, segment['url'], info_dict)
+ if not success:
+ return False
+ frag_content = self._read_fragment(ctx)
+
+ if not extra_state['ism_track_written']:
+ tfhd_data = extract_box_data(frag_content, [b'moof', b'traf', b'tfhd'])
+ info_dict['_download_params']['track_id'] = u32.unpack(tfhd_data[4:8])[0]
+ write_piff_header(ctx['dest_stream'], info_dict['_download_params'])
+ extra_state['ism_track_written'] = True
+ self._append_fragment(ctx, frag_content)
+ except HTTPError as err:
+ retry.error = err
+ continue
+
+ if retry_manager.error:
+ if not skip_unavailable_fragments:
+ return False
+ self.report_skip_fragment(frag_index)
+
+ return self._finish_frag_download(ctx, info_dict)
diff --git a/yt_dlp/downloader/mhtml.py b/yt_dlp/downloader/mhtml.py
new file mode 100644
index 0000000..d977dce
--- /dev/null
+++ b/yt_dlp/downloader/mhtml.py
@@ -0,0 +1,189 @@
+import io
+import quopri
+import re
+import uuid
+
+from .fragment import FragmentFD
+from ..compat import imghdr
+from ..utils import escapeHTML, formatSeconds, srt_subtitles_timecode, urljoin
+from ..version import __version__ as YT_DLP_VERSION
+
+
+class MhtmlFD(FragmentFD):
+ _STYLESHEET = """\
+html, body {
+ margin: 0;
+ padding: 0;
+ height: 100vh;
+}
+
+html {
+ overflow-y: scroll;
+ scroll-snap-type: y mandatory;
+}
+
+body {
+ scroll-snap-type: y mandatory;
+ display: flex;
+ flex-flow: column;
+}
+
+body > figure {
+ max-width: 100vw;
+ max-height: 100vh;
+ scroll-snap-align: center;
+}
+
+body > figure > figcaption {
+ text-align: center;
+ height: 2.5em;
+}
+
+body > figure > img {
+ display: block;
+ margin: auto;
+ max-width: 100%;
+ max-height: calc(100vh - 5em);
+}
+"""
+ _STYLESHEET = re.sub(r'\s+', ' ', _STYLESHEET)
+ _STYLESHEET = re.sub(r'\B \B|(?<=[\w\-]) (?=[^\w\-])|(?<=[^\w\-]) (?=[\w\-])', '', _STYLESHEET)
+
+ @staticmethod
+ def _escape_mime(s):
+ return '=?utf-8?Q?' + (b''.join(
+ bytes((b,)) if b >= 0x20 else b'=%02X' % b
+ for b in quopri.encodestring(s.encode(), header=True)
+ )).decode('us-ascii') + '?='
+
+ def _gen_cid(self, i, fragment, frag_boundary):
+ return '%u.%s@yt-dlp.github.io.invalid' % (i, frag_boundary)
+
+ def _gen_stub(self, *, fragments, frag_boundary, title):
+ output = io.StringIO()
+
+ output.write((
+ '<!DOCTYPE html>'
+ '<html>'
+ '<head>'
+ '' '<meta name="generator" content="yt-dlp {version}">'
+ '' '<title>{title}</title>'
+ '' '<style>{styles}</style>'
+ '<body>'
+ ).format(
+ version=escapeHTML(YT_DLP_VERSION),
+ styles=self._STYLESHEET,
+ title=escapeHTML(title)
+ ))
+
+ t0 = 0
+ for i, frag in enumerate(fragments):
+ output.write('<figure>')
+ try:
+ t1 = t0 + frag['duration']
+ output.write((
+ '<figcaption>Slide #{num}: {t0} – {t1} (duration: {duration})</figcaption>'
+ ).format(
+ num=i + 1,
+ t0=srt_subtitles_timecode(t0),
+ t1=srt_subtitles_timecode(t1),
+ duration=formatSeconds(frag['duration'], msec=True)
+ ))
+ except (KeyError, ValueError, TypeError):
+ t1 = None
+ output.write((
+ '<figcaption>Slide #{num}</figcaption>'
+ ).format(num=i + 1))
+ output.write('<img src="cid:{cid}">'.format(
+ cid=self._gen_cid(i, frag, frag_boundary)))
+ output.write('</figure>')
+ t0 = t1
+
+ return output.getvalue()
+
+ def real_download(self, filename, info_dict):
+ fragment_base_url = info_dict.get('fragment_base_url')
+ fragments = info_dict['fragments'][:1] if self.params.get(
+ 'test', False) else info_dict['fragments']
+ title = info_dict.get('title', info_dict['format_id'])
+ origin = info_dict.get('webpage_url', info_dict['url'])
+
+ ctx = {
+ 'filename': filename,
+ 'total_frags': len(fragments),
+ }
+
+ self._prepare_and_start_frag_download(ctx, info_dict)
+
+ extra_state = ctx.setdefault('extra_state', {
+ 'header_written': False,
+ 'mime_boundary': str(uuid.uuid4()).replace('-', ''),
+ })
+
+ frag_boundary = extra_state['mime_boundary']
+
+ if not extra_state['header_written']:
+ stub = self._gen_stub(
+ fragments=fragments,
+ frag_boundary=frag_boundary,
+ title=title
+ )
+
+ ctx['dest_stream'].write((
+ 'MIME-Version: 1.0\r\n'
+ 'From: <nowhere@yt-dlp.github.io.invalid>\r\n'
+ 'To: <nowhere@yt-dlp.github.io.invalid>\r\n'
+ 'Subject: {title}\r\n'
+ 'Content-type: multipart/related; '
+ '' 'boundary="{boundary}"; '
+ '' 'type="text/html"\r\n'
+ 'X.yt-dlp.Origin: {origin}\r\n'
+ '\r\n'
+ '--{boundary}\r\n'
+ 'Content-Type: text/html; charset=utf-8\r\n'
+ 'Content-Length: {length}\r\n'
+ '\r\n'
+ '{stub}\r\n'
+ ).format(
+ origin=origin,
+ boundary=frag_boundary,
+ length=len(stub),
+ title=self._escape_mime(title),
+ stub=stub
+ ).encode())
+ extra_state['header_written'] = True
+
+ for i, fragment in enumerate(fragments):
+ if (i + 1) <= ctx['fragment_index']:
+ continue
+
+ fragment_url = fragment.get('url')
+ if not fragment_url:
+ assert fragment_base_url
+ fragment_url = urljoin(fragment_base_url, fragment['path'])
+
+ success = self._download_fragment(ctx, fragment_url, info_dict)
+ if not success:
+ continue
+ frag_content = self._read_fragment(ctx)
+
+ frag_header = io.BytesIO()
+ frag_header.write(
+ b'--%b\r\n' % frag_boundary.encode('us-ascii'))
+ frag_header.write(
+ b'Content-ID: <%b>\r\n' % self._gen_cid(i, fragment, frag_boundary).encode('us-ascii'))
+ frag_header.write(
+ b'Content-type: %b\r\n' % f'image/{imghdr.what(h=frag_content) or "jpeg"}'.encode())
+ frag_header.write(
+ b'Content-length: %u\r\n' % len(frag_content))
+ frag_header.write(
+ b'Content-location: %b\r\n' % fragment_url.encode('us-ascii'))
+ frag_header.write(
+ b'X.yt-dlp.Duration: %f\r\n' % fragment['duration'])
+ frag_header.write(b'\r\n')
+ self._append_fragment(
+ ctx, frag_header.getvalue() + frag_content + b'\r\n')
+
+ ctx['dest_stream'].write(
+ b'--%b--\r\n\r\n' % frag_boundary.encode('us-ascii'))
+ return self._finish_frag_download(ctx, info_dict)
diff --git a/yt_dlp/downloader/niconico.py b/yt_dlp/downloader/niconico.py
new file mode 100644
index 0000000..fef8bff
--- /dev/null
+++ b/yt_dlp/downloader/niconico.py
@@ -0,0 +1,140 @@
+import json
+import threading
+import time
+
+from . import get_suitable_downloader
+from .common import FileDownloader
+from .external import FFmpegFD
+from ..networking import Request
+from ..utils import DownloadError, str_or_none, try_get
+
+
+class NiconicoDmcFD(FileDownloader):
+ """ Downloading niconico douga from DMC with heartbeat """
+
+ def real_download(self, filename, info_dict):
+ from ..extractor.niconico import NiconicoIE
+
+ self.to_screen('[%s] Downloading from DMC' % self.FD_NAME)
+ ie = NiconicoIE(self.ydl)
+ info_dict, heartbeat_info_dict = ie._get_heartbeat_info(info_dict)
+
+ fd = get_suitable_downloader(info_dict, params=self.params)(self.ydl, self.params)
+
+ success = download_complete = False
+ timer = [None]
+ heartbeat_lock = threading.Lock()
+ heartbeat_url = heartbeat_info_dict['url']
+ heartbeat_data = heartbeat_info_dict['data'].encode()
+ heartbeat_interval = heartbeat_info_dict.get('interval', 30)
+
+ request = Request(heartbeat_url, heartbeat_data)
+
+ def heartbeat():
+ try:
+ self.ydl.urlopen(request).read()
+ except Exception:
+ self.to_screen('[%s] Heartbeat failed' % self.FD_NAME)
+
+ with heartbeat_lock:
+ if not download_complete:
+ timer[0] = threading.Timer(heartbeat_interval, heartbeat)
+ timer[0].start()
+
+ heartbeat_info_dict['ping']()
+ self.to_screen('[%s] Heartbeat with %d second interval ...' % (self.FD_NAME, heartbeat_interval))
+ try:
+ heartbeat()
+ if type(fd).__name__ == 'HlsFD':
+ info_dict.update(ie._extract_m3u8_formats(info_dict['url'], info_dict['id'])[0])
+ success = fd.real_download(filename, info_dict)
+ finally:
+ if heartbeat_lock:
+ with heartbeat_lock:
+ timer[0].cancel()
+ download_complete = True
+ return success
+
+
+class NiconicoLiveFD(FileDownloader):
+ """ Downloads niconico live without being stopped """
+
+ def real_download(self, filename, info_dict):
+ video_id = info_dict['video_id']
+ ws_url = info_dict['url']
+ ws_extractor = info_dict['ws']
+ ws_origin_host = info_dict['origin']
+ live_quality = info_dict.get('live_quality', 'high')
+ live_latency = info_dict.get('live_latency', 'high')
+ dl = FFmpegFD(self.ydl, self.params or {})
+
+ new_info_dict = info_dict.copy()
+ new_info_dict.update({
+ 'protocol': 'm3u8',
+ })
+
+ def communicate_ws(reconnect):
+ if reconnect:
+ ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'}))
+ if self.ydl.params.get('verbose', False):
+ self.to_screen('[debug] Sending startWatching request')
+ ws.send(json.dumps({
+ 'type': 'startWatching',
+ 'data': {
+ 'stream': {
+ 'quality': live_quality,
+ 'protocol': 'hls+fmp4',
+ 'latency': live_latency,
+ 'chasePlay': False
+ },
+ 'room': {
+ 'protocol': 'webSocket',
+ 'commentable': True
+ },
+ 'reconnect': True,
+ }
+ }))
+ else:
+ ws = ws_extractor
+ with ws:
+ while True:
+ recv = ws.recv()
+ if not recv:
+ continue
+ data = json.loads(recv)
+ if not data or not isinstance(data, dict):
+ continue
+ if data.get('type') == 'ping':
+ # pong back
+ ws.send(r'{"type":"pong"}')
+ ws.send(r'{"type":"keepSeat"}')
+ elif data.get('type') == 'disconnect':
+ self.write_debug(data)
+ return True
+ elif data.get('type') == 'error':
+ self.write_debug(data)
+ message = try_get(data, lambda x: x['body']['code'], str) or recv
+ return DownloadError(message)
+ elif self.ydl.params.get('verbose', False):
+ if len(recv) > 100:
+ recv = recv[:100] + '...'
+ self.to_screen('[debug] Server said: %s' % recv)
+
+ def ws_main():
+ reconnect = False
+ while True:
+ try:
+ ret = communicate_ws(reconnect)
+ if ret is True:
+ return
+ except BaseException as e:
+ self.to_screen('[%s] %s: Connection error occured, reconnecting after 10 seconds: %s' % ('niconico:live', video_id, str_or_none(e)))
+ time.sleep(10)
+ continue
+ finally:
+ reconnect = True
+
+ thread = threading.Thread(target=ws_main, daemon=True)
+ thread.start()
+
+ return dl.download(filename, new_info_dict)
diff --git a/yt_dlp/downloader/rtmp.py b/yt_dlp/downloader/rtmp.py
new file mode 100644
index 0000000..0e09525
--- /dev/null
+++ b/yt_dlp/downloader/rtmp.py
@@ -0,0 +1,213 @@
+import os
+import re
+import subprocess
+import time
+
+from .common import FileDownloader
+from ..utils import (
+ Popen,
+ check_executable,
+ encodeArgument,
+ encodeFilename,
+ get_exe_version,
+)
+
+
+def rtmpdump_version():
+ return get_exe_version(
+ 'rtmpdump', ['--help'], r'(?i)RTMPDump\s*v?([0-9a-zA-Z._-]+)')
+
+
+class RtmpFD(FileDownloader):
+ def real_download(self, filename, info_dict):
+ def run_rtmpdump(args):
+ start = time.time()
+ resume_percent = None
+ resume_downloaded_data_len = None
+ proc = Popen(args, stderr=subprocess.PIPE)
+ cursor_in_new_line = True
+ proc_stderr_closed = False
+ try:
+ while not proc_stderr_closed:
+ # read line from stderr
+ line = ''
+ while True:
+ char = proc.stderr.read(1)
+ if not char:
+ proc_stderr_closed = True
+ break
+ if char in [b'\r', b'\n']:
+ break
+ line += char.decode('ascii', 'replace')
+ if not line:
+ # proc_stderr_closed is True
+ continue
+ mobj = re.search(r'([0-9]+\.[0-9]{3}) kB / [0-9]+\.[0-9]{2} sec \(([0-9]{1,2}\.[0-9])%\)', line)
+ if mobj:
+ downloaded_data_len = int(float(mobj.group(1)) * 1024)
+ percent = float(mobj.group(2))
+ if not resume_percent:
+ resume_percent = percent
+ resume_downloaded_data_len = downloaded_data_len
+ time_now = time.time()
+ eta = self.calc_eta(start, time_now, 100 - resume_percent, percent - resume_percent)
+ speed = self.calc_speed(start, time_now, downloaded_data_len - resume_downloaded_data_len)
+ data_len = None
+ if percent > 0:
+ data_len = int(downloaded_data_len * 100 / percent)
+ self._hook_progress({
+ 'status': 'downloading',
+ 'downloaded_bytes': downloaded_data_len,
+ 'total_bytes_estimate': data_len,
+ 'tmpfilename': tmpfilename,
+ 'filename': filename,
+ 'eta': eta,
+ 'elapsed': time_now - start,
+ 'speed': speed,
+ }, info_dict)
+ cursor_in_new_line = False
+ else:
+ # no percent for live streams
+ mobj = re.search(r'([0-9]+\.[0-9]{3}) kB / [0-9]+\.[0-9]{2} sec', line)
+ if mobj:
+ downloaded_data_len = int(float(mobj.group(1)) * 1024)
+ time_now = time.time()
+ speed = self.calc_speed(start, time_now, downloaded_data_len)
+ self._hook_progress({
+ 'downloaded_bytes': downloaded_data_len,
+ 'tmpfilename': tmpfilename,
+ 'filename': filename,
+ 'status': 'downloading',
+ 'elapsed': time_now - start,
+ 'speed': speed,
+ }, info_dict)
+ cursor_in_new_line = False
+ elif self.params.get('verbose', False):
+ if not cursor_in_new_line:
+ self.to_screen('')
+ cursor_in_new_line = True
+ self.to_screen('[rtmpdump] ' + line)
+ if not cursor_in_new_line:
+ self.to_screen('')
+ return proc.wait()
+ except BaseException: # Including KeyboardInterrupt
+ proc.kill(timeout=None)
+ raise
+
+ url = info_dict['url']
+ player_url = info_dict.get('player_url')
+ page_url = info_dict.get('page_url')
+ app = info_dict.get('app')
+ play_path = info_dict.get('play_path')
+ tc_url = info_dict.get('tc_url')
+ flash_version = info_dict.get('flash_version')
+ live = info_dict.get('rtmp_live', False)
+ conn = info_dict.get('rtmp_conn')
+ protocol = info_dict.get('rtmp_protocol')
+ real_time = info_dict.get('rtmp_real_time', False)
+ no_resume = info_dict.get('no_resume', False)
+ continue_dl = self.params.get('continuedl', True)
+
+ self.report_destination(filename)
+ tmpfilename = self.temp_name(filename)
+ test = self.params.get('test', False)
+
+ # Check for rtmpdump first
+ if not check_executable('rtmpdump', ['-h']):
+ self.report_error('RTMP download detected but "rtmpdump" could not be run. Please install')
+ return False
+
+ # Download using rtmpdump. rtmpdump returns exit code 2 when
+ # the connection was interrupted and resuming appears to be
+ # possible. This is part of rtmpdump's normal usage, AFAIK.
+ basic_args = [
+ 'rtmpdump', '--verbose', '-r', url,
+ '-o', tmpfilename]
+ if player_url is not None:
+ basic_args += ['--swfVfy', player_url]
+ if page_url is not None:
+ basic_args += ['--pageUrl', page_url]
+ if app is not None:
+ basic_args += ['--app', app]
+ if play_path is not None:
+ basic_args += ['--playpath', play_path]
+ if tc_url is not None:
+ basic_args += ['--tcUrl', tc_url]
+ if test:
+ basic_args += ['--stop', '1']
+ if flash_version is not None:
+ basic_args += ['--flashVer', flash_version]
+ if live:
+ basic_args += ['--live']
+ if isinstance(conn, list):
+ for entry in conn:
+ basic_args += ['--conn', entry]
+ elif isinstance(conn, str):
+ basic_args += ['--conn', conn]
+ if protocol is not None:
+ basic_args += ['--protocol', protocol]
+ if real_time:
+ basic_args += ['--realtime']
+
+ args = basic_args
+ if not no_resume and continue_dl and not live:
+ args += ['--resume']
+ if not live and continue_dl:
+ args += ['--skip', '1']
+
+ args = [encodeArgument(a) for a in args]
+
+ self._debug_cmd(args, exe='rtmpdump')
+
+ RD_SUCCESS = 0
+ RD_FAILED = 1
+ RD_INCOMPLETE = 2
+ RD_NO_CONNECT = 3
+
+ started = time.time()
+
+ try:
+ retval = run_rtmpdump(args)
+ except KeyboardInterrupt:
+ if not info_dict.get('is_live'):
+ raise
+ retval = RD_SUCCESS
+ self.to_screen('\n[rtmpdump] Interrupted by user')
+
+ if retval == RD_NO_CONNECT:
+ self.report_error('[rtmpdump] Could not connect to RTMP server.')
+ return False
+
+ while retval in (RD_INCOMPLETE, RD_FAILED) and not test and not live:
+ prevsize = os.path.getsize(encodeFilename(tmpfilename))
+ self.to_screen('[rtmpdump] Downloaded %s bytes' % prevsize)
+ time.sleep(5.0) # This seems to be needed
+ args = basic_args + ['--resume']
+ if retval == RD_FAILED:
+ args += ['--skip', '1']
+ args = [encodeArgument(a) for a in args]
+ retval = run_rtmpdump(args)
+ cursize = os.path.getsize(encodeFilename(tmpfilename))
+ if prevsize == cursize and retval == RD_FAILED:
+ break
+ # Some rtmp streams seem abort after ~ 99.8%. Don't complain for those
+ if prevsize == cursize and retval == RD_INCOMPLETE and cursize > 1024:
+ self.to_screen('[rtmpdump] Could not download the whole video. This can happen for some advertisements.')
+ retval = RD_SUCCESS
+ break
+ if retval == RD_SUCCESS or (test and retval == RD_INCOMPLETE):
+ fsize = os.path.getsize(encodeFilename(tmpfilename))
+ self.to_screen('[rtmpdump] Downloaded %s bytes' % fsize)
+ self.try_rename(tmpfilename, filename)
+ self._hook_progress({
+ 'downloaded_bytes': fsize,
+ 'total_bytes': fsize,
+ 'filename': filename,
+ 'status': 'finished',
+ 'elapsed': time.time() - started,
+ }, info_dict)
+ return True
+ else:
+ self.to_stderr('\n')
+ self.report_error('rtmpdump exited with code %d' % retval)
+ return False
diff --git a/yt_dlp/downloader/rtsp.py b/yt_dlp/downloader/rtsp.py
new file mode 100644
index 0000000..e89269f
--- /dev/null
+++ b/yt_dlp/downloader/rtsp.py
@@ -0,0 +1,42 @@
+import os
+import subprocess
+
+from .common import FileDownloader
+from ..utils import check_executable, encodeFilename
+
+
+class RtspFD(FileDownloader):
+ def real_download(self, filename, info_dict):
+ url = info_dict['url']
+ self.report_destination(filename)
+ tmpfilename = self.temp_name(filename)
+
+ if check_executable('mplayer', ['-h']):
+ args = [
+ 'mplayer', '-really-quiet', '-vo', 'null', '-vc', 'dummy',
+ '-dumpstream', '-dumpfile', tmpfilename, url]
+ elif check_executable('mpv', ['-h']):
+ args = [
+ 'mpv', '-really-quiet', '--vo=null', '--stream-dump=' + tmpfilename, url]
+ else:
+ self.report_error('MMS or RTSP download detected but neither "mplayer" nor "mpv" could be run. Please install one')
+ return False
+
+ self._debug_cmd(args)
+
+ retval = subprocess.call(args)
+ if retval == 0:
+ fsize = os.path.getsize(encodeFilename(tmpfilename))
+ self.to_screen(f'\r[{args[0]}] {fsize} bytes')
+ self.try_rename(tmpfilename, filename)
+ self._hook_progress({
+ 'downloaded_bytes': fsize,
+ 'total_bytes': fsize,
+ 'filename': filename,
+ 'status': 'finished',
+ }, info_dict)
+ return True
+ else:
+ self.to_stderr('\n')
+ self.report_error('%s exited with code %d' % (args[0], retval))
+ return False
diff --git a/yt_dlp/downloader/websocket.py b/yt_dlp/downloader/websocket.py
new file mode 100644
index 0000000..6837ff1
--- /dev/null
+++ b/yt_dlp/downloader/websocket.py
@@ -0,0 +1,53 @@
+import asyncio
+import contextlib
+import os
+import signal
+import threading
+
+from .common import FileDownloader
+from .external import FFmpegFD
+from ..dependencies import websockets
+
+
+class FFmpegSinkFD(FileDownloader):
+ """ A sink to ffmpeg for downloading fragments in any form """
+
+ def real_download(self, filename, info_dict):
+ info_copy = info_dict.copy()
+ info_copy['url'] = '-'
+
+ async def call_conn(proc, stdin):
+ try:
+ await self.real_connection(stdin, info_dict)
+ except OSError:
+ pass
+ finally:
+ with contextlib.suppress(OSError):
+ stdin.flush()
+ stdin.close()
+ os.kill(os.getpid(), signal.SIGINT)
+
+ class FFmpegStdinFD(FFmpegFD):
+ @classmethod
+ def get_basename(cls):
+ return FFmpegFD.get_basename()
+
+ def on_process_started(self, proc, stdin):
+ thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), ))
+ thread.start()
+
+ return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy)
+
+ async def real_connection(self, sink, info_dict):
+ """ Override this in subclasses """
+ raise NotImplementedError('This method must be implemented by subclasses')
+
+
+class WebSocketFragmentFD(FFmpegSinkFD):
+ async def real_connection(self, sink, info_dict):
+ async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws:
+ while True:
+ recv = await ws.recv()
+ if isinstance(recv, str):
+ recv = recv.encode('utf8')
+ sink.write(recv)
diff --git a/yt_dlp/downloader/youtube_live_chat.py b/yt_dlp/downloader/youtube_live_chat.py
new file mode 100644
index 0000000..c7a8637
--- /dev/null
+++ b/yt_dlp/downloader/youtube_live_chat.py
@@ -0,0 +1,228 @@
+import json
+import time
+
+from .fragment import FragmentFD
+from ..networking.exceptions import HTTPError
+from ..utils import (
+ RegexNotFoundError,
+ RetryManager,
+ dict_get,
+ int_or_none,
+ try_get,
+)
+from ..utils.networking import HTTPHeaderDict
+
+
+class YoutubeLiveChatFD(FragmentFD):
+ """ Downloads YouTube live chats fragment by fragment """
+
+ def real_download(self, filename, info_dict):
+ video_id = info_dict['video_id']
+ self.to_screen('[%s] Downloading live chat' % self.FD_NAME)
+ if not self.params.get('skip_download') and info_dict['protocol'] == 'youtube_live_chat':
+ self.report_warning('Live chat download runs until the livestream ends. '
+ 'If you wish to download the video simultaneously, run a separate yt-dlp instance')
+
+ test = self.params.get('test', False)
+
+ ctx = {
+ 'filename': filename,
+ 'live': True,
+ 'total_frags': None,
+ }
+
+ from ..extractor.youtube import YoutubeBaseInfoExtractor
+
+ ie = YoutubeBaseInfoExtractor(self.ydl)
+
+ start_time = int(time.time() * 1000)
+
+ def dl_fragment(url, data=None, headers=None):
+ http_headers = HTTPHeaderDict(info_dict.get('http_headers'), headers)
+ return self._download_fragment(ctx, url, info_dict, http_headers, data)
+
+ def parse_actions_replay(live_chat_continuation):
+ offset = continuation_id = click_tracking_params = None
+ processed_fragment = bytearray()
+ for action in live_chat_continuation.get('actions', []):
+ if 'replayChatItemAction' in action:
+ replay_chat_item_action = action['replayChatItemAction']
+ offset = int(replay_chat_item_action['videoOffsetTimeMsec'])
+ processed_fragment.extend(
+ json.dumps(action, ensure_ascii=False).encode() + b'\n')
+ if offset is not None:
+ continuation = try_get(
+ live_chat_continuation,
+ lambda x: x['continuations'][0]['liveChatReplayContinuationData'], dict)
+ if continuation:
+ continuation_id = continuation.get('continuation')
+ click_tracking_params = continuation.get('clickTrackingParams')
+ self._append_fragment(ctx, processed_fragment)
+ return continuation_id, offset, click_tracking_params
+
+ def try_refresh_replay_beginning(live_chat_continuation):
+ # choose the second option that contains the unfiltered live chat replay
+ refresh_continuation = try_get(
+ live_chat_continuation,
+ lambda x: x['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict)
+ if refresh_continuation:
+ # no data yet but required to call _append_fragment
+ self._append_fragment(ctx, b'')
+ refresh_continuation_id = refresh_continuation.get('continuation')
+ offset = 0
+ click_tracking_params = refresh_continuation.get('trackingParams')
+ return refresh_continuation_id, offset, click_tracking_params
+ return parse_actions_replay(live_chat_continuation)
+
+ live_offset = 0
+
+ def parse_actions_live(live_chat_continuation):
+ nonlocal live_offset
+ continuation_id = click_tracking_params = None
+ processed_fragment = bytearray()
+ for action in live_chat_continuation.get('actions', []):
+ timestamp = self.parse_live_timestamp(action)
+ if timestamp is not None:
+ live_offset = timestamp - start_time
+ # compatibility with replay format
+ pseudo_action = {
+ 'replayChatItemAction': {'actions': [action]},
+ 'videoOffsetTimeMsec': str(live_offset),
+ 'isLive': True,
+ }
+ processed_fragment.extend(
+ json.dumps(pseudo_action, ensure_ascii=False).encode() + b'\n')
+ continuation_data_getters = [
+ lambda x: x['continuations'][0]['invalidationContinuationData'],
+ lambda x: x['continuations'][0]['timedContinuationData'],
+ ]
+ continuation_data = try_get(live_chat_continuation, continuation_data_getters, dict)
+ if continuation_data:
+ continuation_id = continuation_data.get('continuation')
+ click_tracking_params = continuation_data.get('clickTrackingParams')
+ timeout_ms = int_or_none(continuation_data.get('timeoutMs'))
+ if timeout_ms is not None:
+ time.sleep(timeout_ms / 1000)
+ self._append_fragment(ctx, processed_fragment)
+ return continuation_id, live_offset, click_tracking_params
+
+ def download_and_parse_fragment(url, frag_index, request_data=None, headers=None):
+ for retry in RetryManager(self.params.get('fragment_retries'), self.report_retry, frag_index=frag_index):
+ try:
+ success = dl_fragment(url, request_data, headers)
+ if not success:
+ return False, None, None, None
+ raw_fragment = self._read_fragment(ctx)
+ try:
+ data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace'))
+ except RegexNotFoundError:
+ data = None
+ if not data:
+ data = json.loads(raw_fragment)
+ live_chat_continuation = try_get(
+ data,
+ lambda x: x['continuationContents']['liveChatContinuation'], dict) or {}
+
+ func = (info_dict['protocol'] == 'youtube_live_chat' and parse_actions_live
+ or frag_index == 1 and try_refresh_replay_beginning
+ or parse_actions_replay)
+ return (True, *func(live_chat_continuation))
+ except HTTPError as err:
+ retry.error = err
+ continue
+ return False, None, None, None
+
+ self._prepare_and_start_frag_download(ctx, info_dict)
+
+ success = dl_fragment(info_dict['url'])
+ if not success:
+ return False
+ raw_fragment = self._read_fragment(ctx)
+ try:
+ data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace'))
+ except RegexNotFoundError:
+ return False
+ continuation_id = try_get(
+ data,
+ lambda x: x['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation'])
+ # no data yet but required to call _append_fragment
+ self._append_fragment(ctx, b'')
+
+ ytcfg = ie.extract_ytcfg(video_id, raw_fragment.decode('utf-8', 'replace'))
+
+ if not ytcfg:
+ return False
+ api_key = try_get(ytcfg, lambda x: x['INNERTUBE_API_KEY'])
+ innertube_context = try_get(ytcfg, lambda x: x['INNERTUBE_CONTEXT'])
+ if not api_key or not innertube_context:
+ return False
+ visitor_data = try_get(innertube_context, lambda x: x['client']['visitorData'], str)
+ if info_dict['protocol'] == 'youtube_live_chat_replay':
+ url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key
+ chat_page_url = 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id
+ elif info_dict['protocol'] == 'youtube_live_chat':
+ url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key
+ chat_page_url = 'https://www.youtube.com/live_chat?continuation=' + continuation_id
+
+ frag_index = offset = 0
+ click_tracking_params = None
+ while continuation_id is not None:
+ frag_index += 1
+ request_data = {
+ 'context': innertube_context,
+ 'continuation': continuation_id,
+ }
+ if frag_index > 1:
+ request_data['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))}
+ if click_tracking_params:
+ request_data['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params}
+ headers = ie.generate_api_headers(ytcfg=ytcfg, visitor_data=visitor_data)
+ headers.update({'content-type': 'application/json'})
+ fragment_request_data = json.dumps(request_data, ensure_ascii=False).encode() + b'\n'
+ success, continuation_id, offset, click_tracking_params = download_and_parse_fragment(
+ url, frag_index, fragment_request_data, headers)
+ else:
+ success, continuation_id, offset, click_tracking_params = download_and_parse_fragment(
+ chat_page_url, frag_index)
+ if not success:
+ return False
+ if test:
+ break
+
+ return self._finish_frag_download(ctx, info_dict)
+
+ @staticmethod
+ def parse_live_timestamp(action):
+ action_content = dict_get(
+ action,
+ ['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand'])
+ if not isinstance(action_content, dict):
+ return None
+ item = dict_get(action_content, ['item', 'bannerRenderer'])
+ if not isinstance(item, dict):
+ return None
+ renderer = dict_get(item, [
+ # text
+ 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
+ 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
+ # ticker
+ 'liveChatTickerPaidMessageItemRenderer',
+ 'liveChatTickerSponsorItemRenderer',
+ # banner
+ 'liveChatBannerRenderer',
+ ])
+ if not isinstance(renderer, dict):
+ return None
+ parent_item_getters = [
+ lambda x: x['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'],
+ lambda x: x['contents'],
+ ]
+ parent_item = try_get(renderer, parent_item_getters, dict)
+ if parent_item:
+ renderer = dict_get(parent_item, [
+ 'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
+ 'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
+ ])
+ if not isinstance(renderer, dict):
+ return None
+ return int_or_none(renderer.get('timestampUsec'), 1000)