summaryrefslogtreecommitdiffstats
path: root/yt_dlp/postprocessor/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/postprocessor/common.py')
-rw-r--r--yt_dlp/postprocessor/common.py215
1 files changed, 215 insertions, 0 deletions
diff --git a/yt_dlp/postprocessor/common.py b/yt_dlp/postprocessor/common.py
new file mode 100644
index 0000000..8cef86c
--- /dev/null
+++ b/yt_dlp/postprocessor/common.py
@@ -0,0 +1,215 @@
+import functools
+import json
+import os
+
+from ..networking import Request
+from ..networking.exceptions import HTTPError, network_exceptions
+from ..utils import (
+ PostProcessingError,
+ RetryManager,
+ _configuration_args,
+ deprecation_warning,
+ encodeFilename,
+)
+
+
+class PostProcessorMetaClass(type):
+ @staticmethod
+ def run_wrapper(func):
+ @functools.wraps(func)
+ def run(self, info, *args, **kwargs):
+ info_copy = self._copy_infodict(info)
+ self._hook_progress({'status': 'started'}, info_copy)
+ ret = func(self, info, *args, **kwargs)
+ if ret is not None:
+ _, info = ret
+ self._hook_progress({'status': 'finished'}, info_copy)
+ return ret
+ return run
+
+ def __new__(cls, name, bases, attrs):
+ if 'run' in attrs:
+ attrs['run'] = cls.run_wrapper(attrs['run'])
+ return type.__new__(cls, name, bases, attrs)
+
+
+class PostProcessor(metaclass=PostProcessorMetaClass):
+ """Post Processor class.
+
+ PostProcessor objects can be added to downloaders with their
+ add_post_processor() method. When the downloader has finished a
+ successful download, it will take its internal chain of PostProcessors
+ and start calling the run() method on each one of them, first with
+ an initial argument and then with the returned value of the previous
+ PostProcessor.
+
+ PostProcessor objects follow a "mutual registration" process similar
+ to InfoExtractor objects.
+
+ Optionally PostProcessor can use a list of additional command-line arguments
+ with self._configuration_args.
+ """
+
+ _downloader = None
+
+ def __init__(self, downloader=None):
+ self._progress_hooks = []
+ self.add_progress_hook(self.report_progress)
+ self.set_downloader(downloader)
+ self.PP_NAME = self.pp_key()
+
+ @classmethod
+ def pp_key(cls):
+ name = cls.__name__[:-2]
+ return name[6:] if name[:6].lower() == 'ffmpeg' else name
+
+ def to_screen(self, text, prefix=True, *args, **kwargs):
+ if self._downloader:
+ tag = '[%s] ' % self.PP_NAME if prefix else ''
+ return self._downloader.to_screen(f'{tag}{text}', *args, **kwargs)
+
+ def report_warning(self, text, *args, **kwargs):
+ if self._downloader:
+ return self._downloader.report_warning(text, *args, **kwargs)
+
+ def deprecation_warning(self, msg):
+ warn = getattr(self._downloader, 'deprecation_warning', deprecation_warning)
+ return warn(msg, stacklevel=1)
+
+ def deprecated_feature(self, msg):
+ if self._downloader:
+ return self._downloader.deprecated_feature(msg)
+ return deprecation_warning(msg, stacklevel=1)
+
+ def report_error(self, text, *args, **kwargs):
+ self.deprecation_warning('"yt_dlp.postprocessor.PostProcessor.report_error" is deprecated. '
+ 'raise "yt_dlp.utils.PostProcessingError" instead')
+ if self._downloader:
+ return self._downloader.report_error(text, *args, **kwargs)
+
+ def write_debug(self, text, *args, **kwargs):
+ if self._downloader:
+ return self._downloader.write_debug(text, *args, **kwargs)
+
+ def _delete_downloaded_files(self, *files_to_delete, **kwargs):
+ if self._downloader:
+ return self._downloader._delete_downloaded_files(*files_to_delete, **kwargs)
+ for filename in set(filter(None, files_to_delete)):
+ os.remove(filename)
+
+ def get_param(self, name, default=None, *args, **kwargs):
+ if self._downloader:
+ return self._downloader.params.get(name, default, *args, **kwargs)
+ return default
+
+ def set_downloader(self, downloader):
+ """Sets the downloader for this PP."""
+ self._downloader = downloader
+ for ph in getattr(downloader, '_postprocessor_hooks', []):
+ self.add_progress_hook(ph)
+
+ def _copy_infodict(self, info_dict):
+ return getattr(self._downloader, '_copy_infodict', dict)(info_dict)
+
+ @staticmethod
+ def _restrict_to(*, video=True, audio=True, images=True, simulated=True):
+ allowed = {'video': video, 'audio': audio, 'images': images}
+
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(self, info):
+ if not simulated and (self.get_param('simulate') or self.get_param('skip_download')):
+ return [], info
+ format_type = (
+ 'video' if info.get('vcodec') != 'none'
+ else 'audio' if info.get('acodec') != 'none'
+ else 'images')
+ if allowed[format_type]:
+ return func(self, info)
+ else:
+ self.to_screen('Skipping %s' % format_type)
+ return [], info
+ return wrapper
+ return decorator
+
+ def run(self, information):
+ """Run the PostProcessor.
+
+ The "information" argument is a dictionary like the ones
+ composed by InfoExtractors. The only difference is that this
+ one has an extra field called "filepath" that points to the
+ downloaded file.
+
+ This method returns a tuple, the first element is a list of the files
+ that can be deleted, and the second of which is the updated
+ information.
+
+ In addition, this method may raise a PostProcessingError
+ exception if post processing fails.
+ """
+ return [], information # by default, keep file and do nothing
+
+ def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
+ try:
+ os.utime(encodeFilename(path), (atime, mtime))
+ except Exception:
+ self.report_warning(errnote)
+
+ def _configuration_args(self, exe, *args, **kwargs):
+ return _configuration_args(
+ self.pp_key(), self.get_param('postprocessor_args'), exe, *args, **kwargs)
+
+ def _hook_progress(self, status, info_dict):
+ if not self._progress_hooks:
+ return
+ status.update({
+ 'info_dict': info_dict,
+ 'postprocessor': self.pp_key(),
+ })
+ for ph in self._progress_hooks:
+ ph(status)
+
+ def add_progress_hook(self, ph):
+ # See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface
+ self._progress_hooks.append(ph)
+
+ def report_progress(self, s):
+ s['_default_template'] = '%(postprocessor)s %(status)s' % s
+ if not self._downloader:
+ return
+
+ progress_dict = s.copy()
+ progress_dict.pop('info_dict')
+ progress_dict = {'info': s['info_dict'], 'progress': progress_dict}
+
+ progress_template = self.get_param('progress_template', {})
+ tmpl = progress_template.get('postprocess')
+ if tmpl:
+ self._downloader.to_screen(
+ self._downloader.evaluate_outtmpl(tmpl, progress_dict), quiet=False)
+
+ self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
+ progress_template.get('postprocess-title') or 'yt-dlp %(progress._default_template)s',
+ progress_dict))
+
+ def _retry_download(self, err, count, retries):
+ # While this is not an extractor, it behaves similar to one and
+ # so obey extractor_retries and "--retry-sleep extractor"
+ RetryManager.report_retry(err, count, retries, info=self.to_screen, warn=self.report_warning,
+ sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))
+
+ def _download_json(self, url, *, expected_http_errors=(404,)):
+ self.write_debug(f'{self.PP_NAME} query: {url}')
+ for retry in RetryManager(self.get_param('extractor_retries', 3), self._retry_download):
+ try:
+ rsp = self._downloader.urlopen(Request(url))
+ except network_exceptions as e:
+ if isinstance(e, HTTPError) and e.status in expected_http_errors:
+ return None
+ retry.error = PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
+ continue
+ return json.loads(rsp.read().decode(rsp.headers.get_param('charset') or 'utf-8'))
+
+
+class AudioConversionError(PostProcessingError): # Deprecated
+ pass