1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# -*- coding: utf-8 -*-
# (c) 2021 Matt Martz <matt@sivel.net>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import gzip
import io
import sys
try:
from urllib.response import addinfourl
except ImportError:
from urllib import addinfourl
from ansible.module_utils.six import PY3
from ansible.module_utils.six.moves import http_client
from ansible.module_utils.urls import GzipDecodedReader, Request
import pytest
def compress(data):
buf = io.BytesIO()
try:
f = gzip.GzipFile(fileobj=buf, mode='wb')
f.write(data)
finally:
f.close()
return buf.getvalue()
class Sock(io.BytesIO):
def makefile(self, *args, **kwds):
return self
@pytest.fixture
def urlopen_mock(mocker):
return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')
JSON_DATA = b'{"foo": "bar", "baz": "qux", "sandwich": "ham", "tech_level": "pickle", "pop": "corn", "ansible": "awesome"}'
RESP = b'''HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Set-Cookie: foo
Set-Cookie: bar
Content-Length: 108
%s''' % JSON_DATA
GZIP_RESP = b'''HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Set-Cookie: foo
Set-Cookie: bar
Content-Encoding: gzip
Content-Length: 100
%s''' % compress(JSON_DATA)
def test_Request_open_gzip(urlopen_mock):
h = http_client.HTTPResponse(
Sock(GZIP_RESP),
method='GET',
)
h.begin()
if PY3:
urlopen_mock.return_value = h
else:
urlopen_mock.return_value = addinfourl(
h.fp,
h.msg,
'http://ansible.com/',
h.status,
)
urlopen_mock.return_value.msg = h.reason
r = Request().open('GET', 'https://ansible.com/')
assert isinstance(r.fp, GzipDecodedReader)
assert r.read() == JSON_DATA
def test_Request_open_not_gzip(urlopen_mock):
h = http_client.HTTPResponse(
Sock(RESP),
method='GET',
)
h.begin()
if PY3:
urlopen_mock.return_value = h
else:
urlopen_mock.return_value = addinfourl(
h.fp,
h.msg,
'http://ansible.com/',
h.status,
)
urlopen_mock.return_value.msg = h.reason
r = Request().open('GET', 'https://ansible.com/')
assert not isinstance(r.fp, GzipDecodedReader)
assert r.read() == JSON_DATA
def test_Request_open_decompress_false(urlopen_mock):
h = http_client.HTTPResponse(
Sock(RESP),
method='GET',
)
h.begin()
if PY3:
urlopen_mock.return_value = h
else:
urlopen_mock.return_value = addinfourl(
h.fp,
h.msg,
'http://ansible.com/',
h.status,
)
urlopen_mock.return_value.msg = h.reason
r = Request().open('GET', 'https://ansible.com/', decompress=False)
assert not isinstance(r.fp, GzipDecodedReader)
assert r.read() == JSON_DATA
def test_GzipDecodedReader_no_gzip(monkeypatch, mocker):
monkeypatch.delitem(sys.modules, 'gzip')
monkeypatch.delitem(sys.modules, 'ansible.module_utils.urls')
orig_import = __import__
def _import(*args):
if args[0] == 'gzip':
raise ImportError
return orig_import(*args)
if PY3:
mocker.patch('builtins.__import__', _import)
else:
mocker.patch('__builtin__.__import__', _import)
mod = __import__('ansible.module_utils.urls').module_utils.urls
assert mod.HAS_GZIP is False
pytest.raises(mod.MissingModuleError, mod.GzipDecodedReader, None)
|