From c7bab7c39fd51c0812f70020172766303191bc01 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 19:37:42 +0200 Subject: Adding upstream version 2023.03.04. Signed-off-by: Daniel Baumann --- yt_dlp/downloader/websocket.py | 53 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 yt_dlp/downloader/websocket.py (limited to 'yt_dlp/downloader/websocket.py') diff --git a/yt_dlp/downloader/websocket.py b/yt_dlp/downloader/websocket.py new file mode 100644 index 0000000..6837ff1 --- /dev/null +++ b/yt_dlp/downloader/websocket.py @@ -0,0 +1,53 @@ +import asyncio +import contextlib +import os +import signal +import threading + +from .common import FileDownloader +from .external import FFmpegFD +from ..dependencies import websockets + + +class FFmpegSinkFD(FileDownloader): + """ A sink to ffmpeg for downloading fragments in any form """ + + def real_download(self, filename, info_dict): + info_copy = info_dict.copy() + info_copy['url'] = '-' + + async def call_conn(proc, stdin): + try: + await self.real_connection(stdin, info_dict) + except OSError: + pass + finally: + with contextlib.suppress(OSError): + stdin.flush() + stdin.close() + os.kill(os.getpid(), signal.SIGINT) + + class FFmpegStdinFD(FFmpegFD): + @classmethod + def get_basename(cls): + return FFmpegFD.get_basename() + + def on_process_started(self, proc, stdin): + thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), )) + thread.start() + + return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy) + + async def real_connection(self, sink, info_dict): + """ Override this in subclasses """ + raise NotImplementedError('This method must be implemented by subclasses') + + +class WebSocketFragmentFD(FFmpegSinkFD): + async def real_connection(self, sink, info_dict): + async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws: + while True: + recv = await ws.recv() + if isinstance(recv, str): + recv = recv.encode('utf8') + sink.write(recv) -- cgit v1.2.3