summaryrefslogtreecommitdiffstats
path: root/yt_dlp/plugins.py
blob: 3cc879fd7e028bcb46f63de6077cfe15028e9ac1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import contextlib
import importlib
import importlib.abc
import importlib.machinery
import importlib.util
import inspect
import itertools
import pkgutil
import sys
import traceback
import zipimport
from pathlib import Path
from zipfile import ZipFile

from .compat import functools  # isort: split
from .utils import (
    get_executable_path,
    get_system_config_dirs,
    get_user_config_dirs,
    orderedSet,
    write_string,
)

PACKAGE_NAME = 'yt_dlp_plugins'
COMPAT_PACKAGE_NAME = 'ytdlp_plugins'


class PluginLoader(importlib.abc.Loader):
    """Dummy loader for virtual namespace packages"""

    def exec_module(self, module):
        return None


@functools.cache
def dirs_in_zip(archive):
    try:
        with ZipFile(archive) as zip_:
            return set(itertools.chain.from_iterable(
                Path(file).parents for file in zip_.namelist()))
    except FileNotFoundError:
        pass
    except Exception as e:
        write_string(f'WARNING: Could not read zip file {archive}: {e}\n')
    return set()


class PluginFinder(importlib.abc.MetaPathFinder):
    """
    This class provides one or multiple namespace packages.
    It searches in sys.path and yt-dlp config folders for
    the existing subdirectories from which the modules can be imported
    """

    def __init__(self, *packages):
        self._zip_content_cache = {}
        self.packages = set(itertools.chain.from_iterable(
            itertools.accumulate(name.split('.'), lambda a, b: '.'.join((a, b)))
            for name in packages))

    def search_locations(self, fullname):
        candidate_locations = []

        def _get_package_paths(*root_paths, containing_folder='plugins'):
            for config_dir in orderedSet(map(Path, root_paths), lazy=True):
                with contextlib.suppress(OSError):
                    yield from (config_dir / containing_folder).iterdir()

        # Load from yt-dlp config folders
        candidate_locations.extend(_get_package_paths(
            *get_user_config_dirs('yt-dlp'),
            *get_system_config_dirs('yt-dlp'),
            containing_folder='plugins'))

        # Load from yt-dlp-plugins folders
        candidate_locations.extend(_get_package_paths(
            get_executable_path(),
            *get_user_config_dirs(''),
            *get_system_config_dirs(''),
            containing_folder='yt-dlp-plugins'))

        candidate_locations.extend(map(Path, sys.path))  # PYTHONPATH
        with contextlib.suppress(ValueError):  # Added when running __main__.py directly
            candidate_locations.remove(Path(__file__).parent)

        parts = Path(*fullname.split('.'))
        for path in orderedSet(candidate_locations, lazy=True):
            candidate = path / parts
            try:
                if candidate.is_dir():
                    yield candidate
                elif path.suffix in ('.zip', '.egg', '.whl') and path.is_file():
                    if parts in dirs_in_zip(path):
                        yield candidate
            except PermissionError as e:
                write_string(f'Permission error while accessing modules in "{e.filename}"\n')

    def find_spec(self, fullname, path=None, target=None):
        if fullname not in self.packages:
            return None

        search_locations = list(map(str, self.search_locations(fullname)))
        if not search_locations:
            return None

        spec = importlib.machinery.ModuleSpec(fullname, PluginLoader(), is_package=True)
        spec.submodule_search_locations = search_locations
        return spec

    def invalidate_caches(self):
        dirs_in_zip.cache_clear()
        for package in self.packages:
            if package in sys.modules:
                del sys.modules[package]


def directories():
    spec = importlib.util.find_spec(PACKAGE_NAME)
    return spec.submodule_search_locations if spec else []


def iter_modules(subpackage):
    fullname = f'{PACKAGE_NAME}.{subpackage}'
    with contextlib.suppress(ModuleNotFoundError):
        pkg = importlib.import_module(fullname)
        yield from pkgutil.iter_modules(path=pkg.__path__, prefix=f'{fullname}.')


def load_module(module, module_name, suffix):
    return inspect.getmembers(module, lambda obj: (
        inspect.isclass(obj)
        and obj.__name__.endswith(suffix)
        and obj.__module__.startswith(module_name)
        and not obj.__name__.startswith('_')
        and obj.__name__ in getattr(module, '__all__', [obj.__name__])))


def load_plugins(name, suffix):
    classes = {}

    for finder, module_name, _ in iter_modules(name):
        if any(x.startswith('_') for x in module_name.split('.')):
            continue
        try:
            if sys.version_info < (3, 10) and isinstance(finder, zipimport.zipimporter):
                # zipimporter.load_module() is deprecated in 3.10 and removed in 3.12
                # The exec_module branch below is the replacement for >= 3.10
                # See: https://docs.python.org/3/library/zipimport.html#zipimport.zipimporter.exec_module
                module = finder.load_module(module_name)
            else:
                spec = finder.find_spec(module_name)
                module = importlib.util.module_from_spec(spec)
                sys.modules[module_name] = module
                spec.loader.exec_module(module)
        except Exception:
            write_string(f'Error while importing module {module_name!r}\n{traceback.format_exc(limit=-1)}')
            continue
        classes.update(load_module(module, module_name, suffix))

    # Compat: old plugin system using __init__.py
    # Note: plugins imported this way do not show up in directories()
    # nor are considered part of the yt_dlp_plugins namespace package
    with contextlib.suppress(FileNotFoundError):
        spec = importlib.util.spec_from_file_location(
            name, Path(get_executable_path(), COMPAT_PACKAGE_NAME, name, '__init__.py'))
        plugins = importlib.util.module_from_spec(spec)
        sys.modules[spec.name] = plugins
        spec.loader.exec_module(plugins)
        classes.update(load_module(plugins, spec.name, suffix))

    return classes


sys.meta_path.insert(0, PluginFinder(f'{PACKAGE_NAME}.extractor', f'{PACKAGE_NAME}.postprocessor'))

__all__ = ['directories', 'load_plugins', 'PACKAGE_NAME', 'COMPAT_PACKAGE_NAME']