1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
import collections
import random
import urllib.parse
import urllib.request
from ._utils import remove_start
def random_user_agent():
_USER_AGENT_TPL = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36'
_CHROME_VERSIONS = (
'90.0.4430.212',
'90.0.4430.24',
'90.0.4430.70',
'90.0.4430.72',
'90.0.4430.85',
'90.0.4430.93',
'91.0.4472.101',
'91.0.4472.106',
'91.0.4472.114',
'91.0.4472.124',
'91.0.4472.164',
'91.0.4472.19',
'91.0.4472.77',
'92.0.4515.107',
'92.0.4515.115',
'92.0.4515.131',
'92.0.4515.159',
'92.0.4515.43',
'93.0.4556.0',
'93.0.4577.15',
'93.0.4577.63',
'93.0.4577.82',
'94.0.4606.41',
'94.0.4606.54',
'94.0.4606.61',
'94.0.4606.71',
'94.0.4606.81',
'94.0.4606.85',
'95.0.4638.17',
'95.0.4638.50',
'95.0.4638.54',
'95.0.4638.69',
'95.0.4638.74',
'96.0.4664.18',
'96.0.4664.45',
'96.0.4664.55',
'96.0.4664.93',
'97.0.4692.20',
)
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
class HTTPHeaderDict(collections.UserDict, dict):
"""
Store and access keys case-insensitively.
The constructor can take multiple dicts, in which keys in the latter are prioritised.
"""
def __init__(self, *args, **kwargs):
super().__init__()
for dct in args:
if dct is not None:
self.update(dct)
self.update(kwargs)
def __setitem__(self, key, value):
if isinstance(value, bytes):
value = value.decode('latin-1')
super().__setitem__(key.title(), str(value).strip())
def __getitem__(self, key):
return super().__getitem__(key.title())
def __delitem__(self, key):
super().__delitem__(key.title())
def __contains__(self, key):
return super().__contains__(key.title() if isinstance(key, str) else key)
std_headers = HTTPHeaderDict({
'User-Agent': random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Sec-Fetch-Mode': 'navigate',
})
def clean_proxies(proxies: dict, headers: HTTPHeaderDict):
req_proxy = headers.pop('Ytdl-Request-Proxy', None)
if req_proxy:
proxies.clear() # XXX: compat: Ytdl-Request-Proxy takes preference over everything, including NO_PROXY
proxies['all'] = req_proxy
for proxy_key, proxy_url in proxies.items():
if proxy_url == '__noproxy__':
proxies[proxy_key] = None
continue
if proxy_key == 'no': # special case
continue
if proxy_url is not None:
# Ensure proxies without a scheme are http.
try:
proxy_scheme = urllib.request._parse_proxy(proxy_url)[0]
except ValueError:
# Ignore invalid proxy URLs. Sometimes these may be introduced through environment
# variables unrelated to proxy settings - e.g. Colab `COLAB_LANGUAGE_SERVER_PROXY`.
# If the proxy is going to be used, the Request Handler proxy validation will handle it.
continue
if proxy_scheme is None:
proxies[proxy_key] = 'http://' + remove_start(proxy_url, '//')
replace_scheme = {
'socks5': 'socks5h', # compat: socks5 was treated as socks5h
'socks': 'socks4', # compat: non-standard
}
if proxy_scheme in replace_scheme:
proxies[proxy_key] = urllib.parse.urlunparse(
urllib.parse.urlparse(proxy_url)._replace(scheme=replace_scheme[proxy_scheme]))
def clean_headers(headers: HTTPHeaderDict):
if 'Youtubedl-No-Compression' in headers: # compat
del headers['Youtubedl-No-Compression']
headers['Accept-Encoding'] = 'identity'
headers.pop('Ytdl-socks-proxy', None)
def remove_dot_segments(path):
# Implements RFC3986 5.2.4 remote_dot_segments
# Pseudo-code: https://tools.ietf.org/html/rfc3986#section-5.2.4
# https://github.com/urllib3/urllib3/blob/ba49f5c4e19e6bca6827282feb77a3c9f937e64b/src/urllib3/util/url.py#L263
output = []
segments = path.split('/')
for s in segments:
if s == '.':
continue
elif s == '..':
if output:
output.pop()
else:
output.append(s)
if not segments[0] and (not output or output[0]):
output.insert(0, '')
if segments[-1] in ('.', '..'):
output.append('')
return '/'.join(output)
def escape_rfc3986(s):
"""Escape non-ASCII characters as suggested by RFC 3986"""
return urllib.parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
def normalize_url(url):
"""Normalize URL as suggested by RFC 3986"""
url_parsed = urllib.parse.urlparse(url)
return url_parsed._replace(
netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(remove_dot_segments(url_parsed.path)),
params=escape_rfc3986(url_parsed.params),
query=escape_rfc3986(url_parsed.query),
fragment=escape_rfc3986(url_parsed.fragment),
).geturl()
|