summaryrefslogtreecommitdiffstats
path: root/yt_dlp/utils/_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/utils/_utils.py')
-rw-r--r--yt_dlp/utils/_utils.py129
1 files changed, 82 insertions, 47 deletions
diff --git a/yt_dlp/utils/_utils.py b/yt_dlp/utils/_utils.py
index 9efeb6a..e3e80f3 100644
--- a/yt_dlp/utils/_utils.py
+++ b/yt_dlp/utils/_utils.py
@@ -5,7 +5,7 @@ import codecs
import collections
import collections.abc
import contextlib
-import datetime
+import datetime as dt
import email.header
import email.utils
import errno
@@ -50,7 +50,6 @@ from ..compat import (
compat_expanduser,
compat_HTMLParseError,
compat_os_name,
- compat_shlex_quote,
)
from ..dependencies import xattr
@@ -836,9 +835,11 @@ class Popen(subprocess.Popen):
if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
if not isinstance(args, str):
- args = ' '.join(compat_shlex_quote(a) for a in args)
+ args = shell_quote(args, shell=True)
shell = False
- args = f'{self.__comspec()} /Q /S /D /V:OFF /C "{args}"'
+ # Set variable for `cmd.exe` newline escaping (see `utils.shell_quote`)
+ env['='] = '"^\n\n"'
+ args = f'{self.__comspec()} /Q /S /D /V:OFF /E:ON /C "{args}"'
super().__init__(args, *remaining, env=env, shell=shell, **kwargs, startupinfo=self._startupinfo)
@@ -1150,14 +1151,14 @@ def extract_timezone(date_str):
timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
if timezone is not None:
date_str = date_str[:-len(m.group('tz'))]
- timezone = datetime.timedelta(hours=timezone or 0)
+ timezone = dt.timedelta(hours=timezone or 0)
else:
date_str = date_str[:-len(m.group('tz'))]
if not m.group('sign'):
- timezone = datetime.timedelta()
+ timezone = dt.timedelta()
else:
sign = 1 if m.group('sign') == '+' else -1
- timezone = datetime.timedelta(
+ timezone = dt.timedelta(
hours=sign * int(m.group('hours')),
minutes=sign * int(m.group('minutes')))
return timezone, date_str
@@ -1176,8 +1177,8 @@ def parse_iso8601(date_str, delimiter='T', timezone=None):
with contextlib.suppress(ValueError):
date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
- dt = datetime.datetime.strptime(date_str, date_format) - timezone
- return calendar.timegm(dt.timetuple())
+ dt_ = dt.datetime.strptime(date_str, date_format) - timezone
+ return calendar.timegm(dt_.timetuple())
def date_formats(day_first=True):
@@ -1198,12 +1199,12 @@ def unified_strdate(date_str, day_first=True):
for expression in date_formats(day_first):
with contextlib.suppress(ValueError):
- upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
+ upload_date = dt.datetime.strptime(date_str, expression).strftime('%Y%m%d')
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
with contextlib.suppress(ValueError):
- upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
+ upload_date = dt.datetime(*timetuple[:6]).strftime('%Y%m%d')
if upload_date is not None:
return str(upload_date)
@@ -1233,8 +1234,8 @@ def unified_timestamp(date_str, day_first=True):
for expression in date_formats(day_first):
with contextlib.suppress(ValueError):
- dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
- return calendar.timegm(dt.timetuple())
+ dt_ = dt.datetime.strptime(date_str, expression) - timezone + dt.timedelta(hours=pm_delta)
+ return calendar.timegm(dt_.timetuple())
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
@@ -1272,11 +1273,11 @@ def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
if precision == 'auto':
auto_precision = True
precision = 'microsecond'
- today = datetime_round(datetime.datetime.now(datetime.timezone.utc), precision)
+ today = datetime_round(dt.datetime.now(dt.timezone.utc), precision)
if date_str in ('now', 'today'):
return today
if date_str == 'yesterday':
- return today - datetime.timedelta(days=1)
+ return today - dt.timedelta(days=1)
match = re.match(
r'(?P<start>.+)(?P<sign>[+-])(?P<time>\d+)(?P<unit>microsecond|second|minute|hour|day|week|month|year)s?',
date_str)
@@ -1291,13 +1292,13 @@ def datetime_from_str(date_str, precision='auto', format='%Y%m%d'):
if unit == 'week':
unit = 'day'
time *= 7
- delta = datetime.timedelta(**{unit + 's': time})
+ delta = dt.timedelta(**{unit + 's': time})
new_date = start_time + delta
if auto_precision:
return datetime_round(new_date, unit)
return new_date
- return datetime_round(datetime.datetime.strptime(date_str, format), precision)
+ return datetime_round(dt.datetime.strptime(date_str, format), precision)
def date_from_str(date_str, format='%Y%m%d', strict=False):
@@ -1312,21 +1313,21 @@ def date_from_str(date_str, format='%Y%m%d', strict=False):
return datetime_from_str(date_str, precision='microsecond', format=format).date()
-def datetime_add_months(dt, months):
+def datetime_add_months(dt_, months):
"""Increment/Decrement a datetime object by months."""
- month = dt.month + months - 1
- year = dt.year + month // 12
+ month = dt_.month + months - 1
+ year = dt_.year + month // 12
month = month % 12 + 1
- day = min(dt.day, calendar.monthrange(year, month)[1])
- return dt.replace(year, month, day)
+ day = min(dt_.day, calendar.monthrange(year, month)[1])
+ return dt_.replace(year, month, day)
-def datetime_round(dt, precision='day'):
+def datetime_round(dt_, precision='day'):
"""
Round a datetime object's time to a specific precision
"""
if precision == 'microsecond':
- return dt
+ return dt_
unit_seconds = {
'day': 86400,
@@ -1335,8 +1336,8 @@ def datetime_round(dt, precision='day'):
'second': 1,
}
roundto = lambda x, n: ((x + n / 2) // n) * n
- timestamp = roundto(calendar.timegm(dt.timetuple()), unit_seconds[precision])
- return datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+ timestamp = roundto(calendar.timegm(dt_.timetuple()), unit_seconds[precision])
+ return dt.datetime.fromtimestamp(timestamp, dt.timezone.utc)
def hyphenate_date(date_str):
@@ -1357,11 +1358,11 @@ class DateRange:
if start is not None:
self.start = date_from_str(start, strict=True)
else:
- self.start = datetime.datetime.min.date()
+ self.start = dt.datetime.min.date()
if end is not None:
self.end = date_from_str(end, strict=True)
else:
- self.end = datetime.datetime.max.date()
+ self.end = dt.datetime.max.date()
if self.start > self.end:
raise ValueError('Date range: "%s" , the start date must be before the end date' % self)
@@ -1372,7 +1373,7 @@ class DateRange:
def __contains__(self, date):
"""Check if the date is in the range"""
- if not isinstance(date, datetime.date):
+ if not isinstance(date, dt.date):
date = date_from_str(date)
return self.start <= date <= self.end
@@ -1637,15 +1638,38 @@ def get_filesystem_encoding():
return encoding if encoding is not None else 'utf-8'
-def shell_quote(args):
- quoted_args = []
- encoding = get_filesystem_encoding()
- for a in args:
- if isinstance(a, bytes):
- # We may get a filename encoded with 'encodeFilename'
- a = a.decode(encoding)
- quoted_args.append(compat_shlex_quote(a))
- return ' '.join(quoted_args)
+_WINDOWS_QUOTE_TRANS = str.maketrans({'"': '\\"', '\\': '\\\\'})
+_CMD_QUOTE_TRANS = str.maketrans({
+ # Keep quotes balanced by replacing them with `""` instead of `\\"`
+ '"': '""',
+ # Requires a variable `=` containing `"^\n\n"` (set in `utils.Popen`)
+ # `=` should be unique since variables containing `=` cannot be set using cmd
+ '\n': '%=%',
+ # While we are only required to escape backslashes immediately before quotes,
+ # we instead escape all of 'em anyways to be consistent
+ '\\': '\\\\',
+ # Use zero length variable replacement so `%` doesn't get expanded
+ # `cd` is always set as long as extensions are enabled (`/E:ON` in `utils.Popen`)
+ '%': '%%cd:~,%',
+})
+
+
+def shell_quote(args, *, shell=False):
+ args = list(variadic(args))
+ if any(isinstance(item, bytes) for item in args):
+ deprecation_warning('Passing bytes to utils.shell_quote is deprecated')
+ encoding = get_filesystem_encoding()
+ for index, item in enumerate(args):
+ if isinstance(item, bytes):
+ args[index] = item.decode(encoding)
+
+ if compat_os_name != 'nt':
+ return shlex.join(args)
+
+ trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
+ return ' '.join(
+ s if re.fullmatch(r'[\w#$*\-+./:?@\\]+', s, re.ASCII) else s.translate(trans).join('""')
+ for s in args)
def smuggle_url(url, data):
@@ -1996,12 +2020,12 @@ def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
if isinstance(timestamp, (int, float)): # unix timestamp
# Using naive datetime here can break timestamp() in Windows
# Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
- # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
+ # Also, dt.datetime.fromtimestamp breaks for negative timestamps
# Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
- datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
- + datetime.timedelta(seconds=timestamp))
+ datetime_object = (dt.datetime.fromtimestamp(0, dt.timezone.utc)
+ + dt.timedelta(seconds=timestamp))
elif isinstance(timestamp, str): # assume YYYYMMDD
- datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+ datetime_object = dt.datetime.strptime(timestamp, '%Y%m%d')
date_format = re.sub( # Support %s on windows
r'(?<!%)(%%)*%s', rf'\g<1>{int(datetime_object.timestamp())}', date_format)
return datetime_object.strftime(date_format)
@@ -2849,7 +2873,7 @@ def ytdl_is_updateable():
def args_to_str(args):
# Get a short string representation for a subprocess command
- return ' '.join(compat_shlex_quote(a) for a in args)
+ return shell_quote(args)
def error_to_str(err):
@@ -4490,10 +4514,10 @@ def write_xattr(path, key, value):
def random_birthday(year_field, month_field, day_field):
- start_date = datetime.date(1950, 1, 1)
- end_date = datetime.date(1995, 12, 31)
+ start_date = dt.date(1950, 1, 1)
+ end_date = dt.date(1995, 12, 31)
offset = random.randint(0, (end_date - start_date).days)
- random_date = start_date + datetime.timedelta(offset)
+ random_date = start_date + dt.timedelta(offset)
return {
year_field: str(random_date.year),
month_field: str(random_date.month),
@@ -4672,7 +4696,7 @@ def time_seconds(**kwargs):
"""
Returns TZ-aware time in seconds since the epoch (1970-01-01T00:00:00Z)
"""
- return time.time() + datetime.timedelta(**kwargs).total_seconds()
+ return time.time() + dt.timedelta(**kwargs).total_seconds()
# create a JSON Web Signature (jws) with HS256 algorithm
@@ -5415,6 +5439,17 @@ class FormatSorter:
return tuple(self._calculate_field_preference(format, field) for field in self._order)
+def filesize_from_tbr(tbr, duration):
+ """
+ @param tbr: Total bitrate in kbps (1000 bits/sec)
+ @param duration: Duration in seconds
+ @returns Filesize in bytes
+ """
+ if tbr is None or duration is None:
+ return None
+ return int(duration * tbr * (1000 / 8))
+
+
# XXX: Temporary
class _YDLLogger:
def __init__(self, ydl=None):