diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:03:42 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 16:03:42 +0000 |
commit | 66cec45960ce1d9c794e9399de15c138acb18aed (patch) | |
tree | 59cd19d69e9d56b7989b080da7c20ef1a3fe2a5a /ansible_collections/community/docker/tests/unit/plugins | |
parent | Initial commit. (diff) | |
download | ansible-66cec45960ce1d9c794e9399de15c138acb18aed.tar.xz ansible-66cec45960ce1d9c794e9399de15c138acb18aed.zip |
Adding upstream version 7.3.0+dfsg.upstream/7.3.0+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/docker/tests/unit/plugins')
28 files changed, 5949 insertions, 0 deletions
diff --git a/ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py b/ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py new file mode 100644 index 00000000..5ae6a8e1 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py @@ -0,0 +1,57 @@ +# Copyright (c) 2020 Red Hat, Inc. +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from io import StringIO + +from ansible_collections.community.docker.tests.unit.compat import mock +from ansible_collections.community.docker.tests.unit.compat import unittest +from ansible.errors import AnsibleError +from ansible.playbook.play_context import PlayContext +from ansible.plugins.loader import connection_loader + + +class TestDockerConnectionClass(unittest.TestCase): + + def setUp(self): + self.play_context = PlayContext() + self.play_context.prompt = ( + '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: ' + ) + self.in_stream = StringIO() + with mock.patch('ansible_collections.community.docker.plugins.connection.docker.get_bin_path', return_value='docker'): + self.dc = connection_loader.get('community.docker.docker', self.play_context, self.in_stream) + + def tearDown(self): + pass + + @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._old_docker_version', + return_value=('false', 'garbage', '', 1)) + @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._new_docker_version', + return_value=('docker version', '1.2.3', '', 0)) + def test_docker_connection_module_too_old(self, mock_new_docker_verison, mock_old_docker_version): + self.dc._version = None + self.dc.remote_user = 'foo' + self.assertRaisesRegexp(AnsibleError, '^docker connection type requires docker 1.3 or higher$', self.dc._get_actual_user) + + @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._old_docker_version', + return_value=('false', 'garbage', '', 1)) + @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._new_docker_version', + return_value=('docker version', '1.7.0', '', 0)) + def test_docker_connection_module(self, mock_new_docker_verison, mock_old_docker_version): + self.dc._version = None + version = self.dc.docker_version + + # old version and new version fail + @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._old_docker_version', + return_value=('false', 'garbage', '', 1)) + @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._new_docker_version', + return_value=('false', 'garbage', '', 1)) + def test_docker_connection_module_wrong_cmd(self, mock_new_docker_version, mock_old_docker_version): + self.dc._version = None + self.dc.remote_user = 'foo' + self.assertRaisesRegexp(AnsibleError, '^Docker version check (.*?) failed: ', self.dc._get_actual_user) diff --git a/ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py b/ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py new file mode 100644 index 00000000..ea16c0d9 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py @@ -0,0 +1,214 @@ +# Copyright (c), Felix Fontein <felix@fontein.de>, 2020 +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +import pytest + +from ansible.inventory.data import InventoryData + +from ansible_collections.community.docker.plugins.inventory.docker_containers import InventoryModule + + +@pytest.fixture(scope="module") +def inventory(): + r = InventoryModule() + r.inventory = InventoryData() + return r + + +LOVING_THARP = { + 'Id': '7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a', + 'Name': '/loving_tharp', + 'Image': 'sha256:349f492ff18add678364a62a67ce9a13487f14293ae0af1baf02398aa432f385', + 'State': { + 'Running': True, + }, + 'Config': { + 'Image': 'quay.io/ansible/ubuntu1804-test-container:1.21.0', + }, +} + + +LOVING_THARP_STACK = { + 'Id': '7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a', + 'Name': '/loving_tharp', + 'Image': 'sha256:349f492ff18add678364a62a67ce9a13487f14293ae0af1baf02398aa432f385', + 'State': { + 'Running': True, + }, + 'Config': { + 'Image': 'quay.io/ansible/ubuntu1804-test-container:1.21.0', + 'Labels': { + 'com.docker.stack.namespace': 'my_stack', + }, + }, + 'NetworkSettings': { + 'Ports': { + '22/tcp': [ + { + 'HostIp': '0.0.0.0', + 'HostPort': '32802' + } + ], + }, + }, +} + + +LOVING_THARP_SERVICE = { + 'Id': '7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a', + 'Name': '/loving_tharp', + 'Image': 'sha256:349f492ff18add678364a62a67ce9a13487f14293ae0af1baf02398aa432f385', + 'State': { + 'Running': True, + }, + 'Config': { + 'Image': 'quay.io/ansible/ubuntu1804-test-container:1.21.0', + 'Labels': { + 'com.docker.swarm.service.name': 'my_service', + }, + }, +} + + +def create_get_option(options, default=False): + def get_option(option): + if option in options: + return options[option] + return default + + return get_option + + +class FakeClient(object): + def __init__(self, *hosts): + self.get_results = {} + list_reply = [] + for host in hosts: + list_reply.append({ + 'Id': host['Id'], + 'Names': [host['Name']] if host['Name'] else [], + 'Image': host['Config']['Image'], + 'ImageId': host['Image'], + }) + self.get_results['/containers/{0}/json'.format(host['Name'])] = host + self.get_results['/containers/{0}/json'.format(host['Id'])] = host + self.get_results['/containers/json'] = list_reply + + def get_json(self, url, *param, **kwargs): + url = url.format(*param) + return self.get_results[url] + + +def test_populate(inventory, mocker): + client = FakeClient(LOVING_THARP) + + inventory.get_option = mocker.MagicMock(side_effect=create_get_option({ + 'verbose_output': True, + 'connection_type': 'docker-api', + 'add_legacy_groups': False, + 'compose': {}, + 'groups': {}, + 'keyed_groups': {}, + })) + inventory._populate(client) + + host_1 = inventory.inventory.get_host('loving_tharp') + host_1_vars = host_1.get_vars() + + assert host_1_vars['ansible_host'] == 'loving_tharp' + assert host_1_vars['ansible_connection'] == 'community.docker.docker_api' + assert 'ansible_ssh_host' not in host_1_vars + assert 'ansible_ssh_port' not in host_1_vars + assert 'docker_state' in host_1_vars + assert 'docker_config' in host_1_vars + assert 'docker_image' in host_1_vars + + assert len(inventory.inventory.groups['ungrouped'].hosts) == 0 + assert len(inventory.inventory.groups['all'].hosts) == 0 + assert len(inventory.inventory.groups) == 2 + assert len(inventory.inventory.hosts) == 1 + + +def test_populate_service(inventory, mocker): + client = FakeClient(LOVING_THARP_SERVICE) + + inventory.get_option = mocker.MagicMock(side_effect=create_get_option({ + 'verbose_output': False, + 'connection_type': 'docker-cli', + 'add_legacy_groups': True, + 'compose': {}, + 'groups': {}, + 'keyed_groups': {}, + 'docker_host': 'unix://var/run/docker.sock', + })) + inventory._populate(client) + + host_1 = inventory.inventory.get_host('loving_tharp') + host_1_vars = host_1.get_vars() + + assert host_1_vars['ansible_host'] == 'loving_tharp' + assert host_1_vars['ansible_connection'] == 'community.docker.docker' + assert 'ansible_ssh_host' not in host_1_vars + assert 'ansible_ssh_port' not in host_1_vars + assert 'docker_state' not in host_1_vars + assert 'docker_config' not in host_1_vars + assert 'docker_image' not in host_1_vars + + assert len(inventory.inventory.groups['ungrouped'].hosts) == 0 + assert len(inventory.inventory.groups['all'].hosts) == 0 + assert len(inventory.inventory.groups['7bd547963679e'].hosts) == 1 + assert len(inventory.inventory.groups['7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a'].hosts) == 1 + assert len(inventory.inventory.groups['image_quay.io/ansible/ubuntu1804-test-container:1.21.0'].hosts) == 1 + assert len(inventory.inventory.groups['loving_tharp'].hosts) == 1 + assert len(inventory.inventory.groups['running'].hosts) == 1 + assert len(inventory.inventory.groups['stopped'].hosts) == 0 + assert len(inventory.inventory.groups['service_my_service'].hosts) == 1 + assert len(inventory.inventory.groups['unix://var/run/docker.sock'].hosts) == 1 + assert len(inventory.inventory.groups) == 10 + assert len(inventory.inventory.hosts) == 1 + + +def test_populate_stack(inventory, mocker): + client = FakeClient(LOVING_THARP_STACK) + + inventory.get_option = mocker.MagicMock(side_effect=create_get_option({ + 'verbose_output': False, + 'connection_type': 'ssh', + 'add_legacy_groups': True, + 'compose': {}, + 'groups': {}, + 'keyed_groups': {}, + 'docker_host': 'unix://var/run/docker.sock', + 'default_ip': '127.0.0.1', + 'private_ssh_port': 22, + })) + inventory._populate(client) + + host_1 = inventory.inventory.get_host('loving_tharp') + host_1_vars = host_1.get_vars() + + assert host_1_vars['ansible_ssh_host'] == '127.0.0.1' + assert host_1_vars['ansible_ssh_port'] == '32802' + assert 'ansible_host' not in host_1_vars + assert 'ansible_connection' not in host_1_vars + assert 'docker_state' not in host_1_vars + assert 'docker_config' not in host_1_vars + assert 'docker_image' not in host_1_vars + + assert len(inventory.inventory.groups['ungrouped'].hosts) == 0 + assert len(inventory.inventory.groups['all'].hosts) == 0 + assert len(inventory.inventory.groups['7bd547963679e'].hosts) == 1 + assert len(inventory.inventory.groups['7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a'].hosts) == 1 + assert len(inventory.inventory.groups['image_quay.io/ansible/ubuntu1804-test-container:1.21.0'].hosts) == 1 + assert len(inventory.inventory.groups['loving_tharp'].hosts) == 1 + assert len(inventory.inventory.groups['running'].hosts) == 1 + assert len(inventory.inventory.groups['stopped'].hosts) == 0 + assert len(inventory.inventory.groups['stack_my_stack'].hosts) == 1 + assert len(inventory.inventory.groups['unix://var/run/docker.sock'].hosts) == 1 + assert len(inventory.inventory.groups) == 10 + assert len(inventory.inventory.hosts) == 1 diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py new file mode 100644 index 00000000..ea003565 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py @@ -0,0 +1,702 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import datetime +import io +import json +import os +import re +import shutil +import socket +import struct +import tempfile +import threading +import time +import unittest +import sys + +from ansible.module_utils import six + +import pytest +import requests + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api import constants, errors +from ansible_collections.community.docker.plugins.module_utils._api.api.client import APIClient +from ansible_collections.community.docker.plugins.module_utils._api.constants import DEFAULT_DOCKER_API_VERSION +from ansible_collections.community.docker.plugins.module_utils._api.utils.utils import convert_filters +from requests.packages import urllib3 + +from .. import fake_api + +try: + from unittest import mock +except ImportError: + import mock + + +DEFAULT_TIMEOUT_SECONDS = constants.DEFAULT_TIMEOUT_SECONDS + + +def response(status_code=200, content='', headers=None, reason=None, elapsed=0, + request=None, raw=None): + res = requests.Response() + res.status_code = status_code + if not isinstance(content, six.binary_type): + content = json.dumps(content).encode('ascii') + res._content = content + res.headers = requests.structures.CaseInsensitiveDict(headers or {}) + res.reason = reason + res.elapsed = datetime.timedelta(elapsed) + res.request = request + res.raw = raw + return res + + +def fake_resolve_authconfig(authconfig, registry=None, *args, **kwargs): + return None + + +def fake_inspect_container(self, container, tty=False): + return fake_api.get_fake_inspect_container(tty=tty)[1] + + +def fake_resp(method, url, *args, **kwargs): + key = None + if url in fake_api.fake_responses: + key = url + elif (url, method) in fake_api.fake_responses: + key = (url, method) + if not key: + raise Exception('{method} {url}'.format(method=method, url=url)) + status_code, content = fake_api.fake_responses[key]() + return response(status_code=status_code, content=content) + + +fake_request = mock.Mock(side_effect=fake_resp) + + +def fake_get(self, url, *args, **kwargs): + return fake_request('GET', url, *args, **kwargs) + + +def fake_post(self, url, *args, **kwargs): + return fake_request('POST', url, *args, **kwargs) + + +def fake_put(self, url, *args, **kwargs): + return fake_request('PUT', url, *args, **kwargs) + + +def fake_delete(self, url, *args, **kwargs): + return fake_request('DELETE', url, *args, **kwargs) + + +def fake_read_from_socket(self, response, stream, tty=False, demux=False): + return six.binary_type() + + +url_base = '{prefix}/'.format(prefix=fake_api.prefix) +url_prefix = '{0}v{1}/'.format( + url_base, + constants.DEFAULT_DOCKER_API_VERSION) + + +class BaseAPIClientTest(unittest.TestCase): + def setUp(self): + self.patcher = mock.patch.multiple( + 'ansible_collections.community.docker.plugins.module_utils._api.api.client.APIClient', + get=fake_get, + post=fake_post, + put=fake_put, + delete=fake_delete, + _read_from_socket=fake_read_from_socket + ) + self.patcher.start() + self.client = APIClient(version=DEFAULT_DOCKER_API_VERSION) + + def tearDown(self): + self.client.close() + self.patcher.stop() + + def base_create_payload(self, img='busybox', cmd=None): + if not cmd: + cmd = ['true'] + return {"Tty": False, "Image": img, "Cmd": cmd, + "AttachStdin": False, + "AttachStderr": True, "AttachStdout": True, + "StdinOnce": False, + "OpenStdin": False, "NetworkDisabled": False, + } + + +class DockerApiTest(BaseAPIClientTest): + def test_ctor(self): + with pytest.raises(errors.DockerException) as excinfo: + APIClient(version=1.12) + + assert str( + excinfo.value + ) == 'Version parameter must be a string or None. Found float' + + def test_url_valid_resource(self): + url = self.client._url('/hello/{0}/world', 'somename') + assert url == '{0}{1}'.format(url_prefix, 'hello/somename/world') + + url = self.client._url( + '/hello/{0}/world/{1}', 'somename', 'someothername' + ) + assert url == '{0}{1}'.format( + url_prefix, 'hello/somename/world/someothername' + ) + + url = self.client._url('/hello/{0}/world', 'some?name') + assert url == '{0}{1}'.format(url_prefix, 'hello/some%3Fname/world') + + url = self.client._url("/images/{0}/push", "localhost:5000/image") + assert url == '{0}{1}'.format( + url_prefix, 'images/localhost:5000/image/push' + ) + + def test_url_invalid_resource(self): + with pytest.raises(ValueError): + self.client._url('/hello/{0}/world', ['sakuya', 'izayoi']) + + def test_url_no_resource(self): + url = self.client._url('/simple') + assert url == '{0}{1}'.format(url_prefix, 'simple') + + def test_url_unversioned_api(self): + url = self.client._url( + '/hello/{0}/world', 'somename', versioned_api=False + ) + assert url == '{0}{1}'.format(url_base, 'hello/somename/world') + + def test_version(self): + self.client.version() + + fake_request.assert_called_with( + 'GET', + url_prefix + 'version', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_version_no_api_version(self): + self.client.version(False) + + fake_request.assert_called_with( + 'GET', + url_base + 'version', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_retrieve_server_version(self): + client = APIClient(version="auto") + assert isinstance(client._version, six.string_types) + assert not (client._version == "auto") + client.close() + + def test_auto_retrieve_server_version(self): + version = self.client._retrieve_server_version() + assert isinstance(version, six.string_types) + + def test_info(self): + self.client.info() + + fake_request.assert_called_with( + 'GET', + url_prefix + 'info', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_search(self): + self.client.get_json('/images/search', params={'term': 'busybox'}) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/search', + params={'term': 'busybox'}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_login(self): + self.client.login('sakuya', 'izayoi') + args = fake_request.call_args + assert args[0][0] == 'POST' + assert args[0][1] == url_prefix + 'auth' + assert json.loads(args[1]['data']) == { + 'username': 'sakuya', 'password': 'izayoi' + } + assert args[1]['headers'] == {'Content-Type': 'application/json'} + assert self.client._auth_configs.auths['docker.io'] == { + 'email': None, + 'password': 'izayoi', + 'username': 'sakuya', + 'serveraddress': None, + } + + def test_events(self): + self.client.events() + + fake_request.assert_called_with( + 'GET', + url_prefix + 'events', + params={'since': None, 'until': None, 'filters': None}, + stream=True, + timeout=None + ) + + def test_events_with_since_until(self): + ts = 1356048000 + now = datetime.datetime.utcfromtimestamp(ts) + since = now - datetime.timedelta(seconds=10) + until = now + datetime.timedelta(seconds=10) + + self.client.events(since=since, until=until) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'events', + params={ + 'since': ts - 10, + 'until': ts + 10, + 'filters': None + }, + stream=True, + timeout=None + ) + + def test_events_with_filters(self): + filters = {'event': ['die', 'stop'], + 'container': fake_api.FAKE_CONTAINER_ID} + + self.client.events(filters=filters) + + expected_filters = convert_filters(filters) + fake_request.assert_called_with( + 'GET', + url_prefix + 'events', + params={ + 'since': None, + 'until': None, + 'filters': expected_filters + }, + stream=True, + timeout=None + ) + + def _socket_path_for_client_session(self, client): + socket_adapter = client.get_adapter('http+docker://') + return socket_adapter.socket_path + + def test_url_compatibility_unix(self): + c = APIClient( + base_url="unix://socket", + version=DEFAULT_DOCKER_API_VERSION) + + assert self._socket_path_for_client_session(c) == '/socket' + + def test_url_compatibility_unix_triple_slash(self): + c = APIClient( + base_url="unix:///socket", + version=DEFAULT_DOCKER_API_VERSION) + + assert self._socket_path_for_client_session(c) == '/socket' + + def test_url_compatibility_http_unix_triple_slash(self): + c = APIClient( + base_url="http+unix:///socket", + version=DEFAULT_DOCKER_API_VERSION) + + assert self._socket_path_for_client_session(c) == '/socket' + + def test_url_compatibility_http(self): + c = APIClient( + base_url="http://hostname:1234", + version=DEFAULT_DOCKER_API_VERSION) + + assert c.base_url == "http://hostname:1234" + + def test_url_compatibility_tcp(self): + c = APIClient( + base_url="tcp://hostname:1234", + version=DEFAULT_DOCKER_API_VERSION) + + assert c.base_url == "http://hostname:1234" + + def test_remove_link(self): + self.client.delete_call('/containers/{0}', '3cc2351ab11b', params={'v': False, 'link': True, 'force': False}) + + fake_request.assert_called_with( + 'DELETE', + url_prefix + 'containers/3cc2351ab11b', + params={'v': False, 'link': True, 'force': False}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_stream_helper_decoding(self): + status_code, content = fake_api.fake_responses[url_prefix + 'events']() + content_str = json.dumps(content) + if six.PY3: + content_str = content_str.encode('utf-8') + body = io.BytesIO(content_str) + + # mock a stream interface + raw_resp = urllib3.HTTPResponse(body=body) + setattr(raw_resp._fp, 'chunked', True) + setattr(raw_resp._fp, 'chunk_left', len(body.getvalue()) - 1) + + # pass `decode=False` to the helper + raw_resp._fp.seek(0) + resp = response(status_code=status_code, content=content, raw=raw_resp) + result = next(self.client._stream_helper(resp)) + assert result == content_str + + # pass `decode=True` to the helper + raw_resp._fp.seek(0) + resp = response(status_code=status_code, content=content, raw=raw_resp) + result = next(self.client._stream_helper(resp, decode=True)) + assert result == content + + # non-chunked response, pass `decode=False` to the helper + setattr(raw_resp._fp, 'chunked', False) + raw_resp._fp.seek(0) + resp = response(status_code=status_code, content=content, raw=raw_resp) + result = next(self.client._stream_helper(resp)) + assert result == content_str.decode('utf-8') + + # non-chunked response, pass `decode=True` to the helper + raw_resp._fp.seek(0) + resp = response(status_code=status_code, content=content, raw=raw_resp) + result = next(self.client._stream_helper(resp, decode=True)) + assert result == content + + +class UnixSocketStreamTest(unittest.TestCase): + def setUp(self): + socket_dir = tempfile.mkdtemp() + self.build_context = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, socket_dir) + self.addCleanup(shutil.rmtree, self.build_context) + self.socket_file = os.path.join(socket_dir, 'test_sock.sock') + self.server_socket = self._setup_socket() + self.stop_server = False + server_thread = threading.Thread(target=self.run_server) + server_thread.daemon = True + server_thread.start() + self.response = None + self.request_handler = None + self.addCleanup(server_thread.join) + self.addCleanup(self.stop) + + def stop(self): + self.stop_server = True + + def _setup_socket(self): + server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_sock.bind(self.socket_file) + # Non-blocking mode so that we can shut the test down easily + server_sock.setblocking(0) + server_sock.listen(5) + return server_sock + + def run_server(self): + try: + while not self.stop_server: + try: + connection, client_address = self.server_socket.accept() + except socket.error: + # Probably no connection to accept yet + time.sleep(0.01) + continue + + connection.setblocking(1) + try: + self.request_handler(connection) + finally: + connection.close() + finally: + self.server_socket.close() + + def early_response_sending_handler(self, connection): + data = b'' + headers = None + + connection.sendall(self.response) + while not headers: + data += connection.recv(2048) + parts = data.split(b'\r\n\r\n', 1) + if len(parts) == 2: + headers, data = parts + + mo = re.search(r'Content-Length: ([0-9]+)', headers.decode()) + assert mo + content_length = int(mo.group(1)) + + while True: + if len(data) >= content_length: + break + + data += connection.recv(2048) + + @pytest.mark.skipif( + constants.IS_WINDOWS_PLATFORM, reason='Unix only' + ) + def test_early_stream_response(self): + self.request_handler = self.early_response_sending_handler + lines = [] + for i in range(0, 50): + line = str(i).encode() + lines += [('%x' % len(line)).encode(), line] + lines.append(b'0') + lines.append(b'') + + self.response = ( + b'HTTP/1.1 200 OK\r\n' + b'Transfer-Encoding: chunked\r\n' + b'\r\n' + ) + b'\r\n'.join(lines) + + with APIClient( + base_url="http+unix://" + self.socket_file, + version=DEFAULT_DOCKER_API_VERSION) as client: + for i in range(5): + try: + params = { + 't': None, + 'remote': None, + 'q': False, + 'nocache': False, + 'rm': False, + 'forcerm': False, + 'pull': False, + 'dockerfile': 'Dockerfile', + } + headers = {'Content-Type': 'application/tar'} + data = b'...' + response = client._post(client._url('/build'), params=params, headers=headers, data=data, stream=True) + stream = client._stream_helper(response, decode=False) + break + except requests.ConnectionError as e: + if i == 4: + raise e + + assert list(stream) == [ + str(i).encode() for i in range(50) + ] + + +@pytest.mark.skip( + 'This test requires starting a networking server and tries to access it. ' + 'This does not work with network separation with Docker-based unit tests, ' + 'but it does work with podman-based unit tests.' +) +class TCPSocketStreamTest(unittest.TestCase): + stdout_data = b''' + Now, those children out there, they're jumping through the + flames in the hope that the god of the fire will make them fruitful. + Really, you can't blame them. After all, what girl would not prefer the + child of a god to that of some acne-scarred artisan? + ''' + stderr_data = b''' + And what of the true God? To whose glory churches and monasteries have been + built on these islands for generations past? Now shall what of Him? + ''' + + @classmethod + def setup_class(cls): + cls.server = six.moves.socketserver.ThreadingTCPServer( + ('', 0), cls.get_handler_class()) + cls.thread = threading.Thread(target=cls.server.serve_forever) + cls.thread.daemon = True + cls.thread.start() + cls.address = 'http://{0}:{1}'.format( + socket.gethostname(), cls.server.server_address[1]) + + @classmethod + def teardown_class(cls): + cls.server.shutdown() + cls.server.server_close() + cls.thread.join() + + @classmethod + def get_handler_class(cls): + stdout_data = cls.stdout_data + stderr_data = cls.stderr_data + + class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object): + def do_POST(self): + resp_data = self.get_resp_data() + self.send_response(101) + self.send_header( + 'Content-Type', 'application/vnd.docker.raw-stream') + self.send_header('Connection', 'Upgrade') + self.send_header('Upgrade', 'tcp') + self.end_headers() + self.wfile.flush() + time.sleep(0.2) + self.wfile.write(resp_data) + self.wfile.flush() + + def get_resp_data(self): + path = self.path.split('/')[-1] + if path == 'tty': + return stdout_data + stderr_data + elif path == 'no-tty': + data = b'' + data += self.frame_header(1, stdout_data) + data += stdout_data + data += self.frame_header(2, stderr_data) + data += stderr_data + return data + else: + raise Exception('Unknown path {path}'.format(path=path)) + + @staticmethod + def frame_header(stream, data): + return struct.pack('>BxxxL', stream, len(data)) + + return Handler + + def request(self, stream=None, tty=None, demux=None): + assert stream is not None and tty is not None and demux is not None + with APIClient( + base_url=self.address, + version=DEFAULT_DOCKER_API_VERSION, + ) as client: + if tty: + url = client._url('/tty') + else: + url = client._url('/no-tty') + resp = client._post(url, stream=True) + return client._read_from_socket( + resp, stream=stream, tty=tty, demux=demux) + + def test_read_from_socket_tty(self): + res = self.request(stream=True, tty=True, demux=False) + assert next(res) == self.stdout_data + self.stderr_data + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_tty_demux(self): + res = self.request(stream=True, tty=True, demux=True) + assert next(res) == (self.stdout_data + self.stderr_data, None) + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_no_tty(self): + res = self.request(stream=True, tty=False, demux=False) + assert next(res) == self.stdout_data + assert next(res) == self.stderr_data + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_no_tty_demux(self): + res = self.request(stream=True, tty=False, demux=True) + assert (self.stdout_data, None) == next(res) + assert (None, self.stderr_data) == next(res) + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_no_stream_tty(self): + res = self.request(stream=False, tty=True, demux=False) + assert res == self.stdout_data + self.stderr_data + + def test_read_from_socket_no_stream_tty_demux(self): + res = self.request(stream=False, tty=True, demux=True) + assert res == (self.stdout_data + self.stderr_data, None) + + def test_read_from_socket_no_stream_no_tty(self): + res = self.request(stream=False, tty=False, demux=False) + res == self.stdout_data + self.stderr_data + + def test_read_from_socket_no_stream_no_tty_demux(self): + res = self.request(stream=False, tty=False, demux=True) + assert res == (self.stdout_data, self.stderr_data) + + +class UserAgentTest(unittest.TestCase): + def setUp(self): + self.patcher = mock.patch.object( + APIClient, + 'send', + return_value=fake_resp("GET", "%s/version" % fake_api.prefix) + ) + self.mock_send = self.patcher.start() + + def tearDown(self): + self.patcher.stop() + + def test_default_user_agent(self): + client = APIClient(version=DEFAULT_DOCKER_API_VERSION) + client.version() + + assert self.mock_send.call_count == 1 + headers = self.mock_send.call_args[0][0].headers + expected = 'ansible-community.docker' + assert headers['User-Agent'] == expected + + def test_custom_user_agent(self): + client = APIClient( + user_agent='foo/bar', + version=DEFAULT_DOCKER_API_VERSION) + client.version() + + assert self.mock_send.call_count == 1 + headers = self.mock_send.call_args[0][0].headers + assert headers['User-Agent'] == 'foo/bar' + + +class DisableSocketTest(unittest.TestCase): + class DummySocket: + def __init__(self, timeout=60): + self.timeout = timeout + + def settimeout(self, timeout): + self.timeout = timeout + + def gettimeout(self): + return self.timeout + + def setUp(self): + self.client = APIClient(version=DEFAULT_DOCKER_API_VERSION) + + def test_disable_socket_timeout(self): + """Test that the timeout is disabled on a generic socket object.""" + socket = self.DummySocket() + + self.client._disable_socket_timeout(socket) + + assert socket.timeout is None + + def test_disable_socket_timeout2(self): + """Test that the timeouts are disabled on a generic socket object + and it's _sock object if present.""" + socket = self.DummySocket() + socket._sock = self.DummySocket() + + self.client._disable_socket_timeout(socket) + + assert socket.timeout is None + assert socket._sock.timeout is None + + def test_disable_socket_timout_non_blocking(self): + """Test that a non-blocking socket does not get set to blocking.""" + socket = self.DummySocket() + socket._sock = self.DummySocket(0.0) + + self.client._disable_socket_timeout(socket) + + assert socket.timeout is None + assert socket._sock.timeout == 0.0 diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py new file mode 100644 index 00000000..b794ff3d --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py @@ -0,0 +1,668 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible_collections.community.docker.plugins.module_utils._api import constants + +from . import fake_stat + +CURRENT_VERSION = 'v{api_version}'.format(api_version=constants.DEFAULT_DOCKER_API_VERSION) + +FAKE_CONTAINER_ID = '3cc2351ab11b' +FAKE_IMAGE_ID = 'e9aa60c60128' +FAKE_EXEC_ID = 'd5d177f121dc' +FAKE_NETWORK_ID = '33fb6a3462b8' +FAKE_IMAGE_NAME = 'test_image' +FAKE_TARBALL_PATH = '/path/to/tarball' +FAKE_REPO_NAME = 'repo' +FAKE_TAG_NAME = 'tag' +FAKE_FILE_NAME = 'file' +FAKE_URL = 'myurl' +FAKE_PATH = '/path' +FAKE_VOLUME_NAME = 'perfectcherryblossom' +FAKE_NODE_ID = '24ifsmvkjbyhk' +FAKE_SECRET_ID = 'epdyrw4tsi03xy3deu8g8ly6o' +FAKE_SECRET_NAME = 'super_secret' + +# Each method is prefixed with HTTP method (get, post...) +# for clarity and readability + + +def get_fake_version(): + status_code = 200 + response = { + 'ApiVersion': '1.35', + 'Arch': 'amd64', + 'BuildTime': '2018-01-10T20:09:37.000000000+00:00', + 'Components': [{ + 'Details': { + 'ApiVersion': '1.35', + 'Arch': 'amd64', + 'BuildTime': '2018-01-10T20:09:37.000000000+00:00', + 'Experimental': 'false', + 'GitCommit': '03596f5', + 'GoVersion': 'go1.9.2', + 'KernelVersion': '4.4.0-112-generic', + 'MinAPIVersion': '1.12', + 'Os': 'linux' + }, + 'Name': 'Engine', + 'Version': '18.01.0-ce' + }], + 'GitCommit': '03596f5', + 'GoVersion': 'go1.9.2', + 'KernelVersion': '4.4.0-112-generic', + 'MinAPIVersion': '1.12', + 'Os': 'linux', + 'Platform': {'Name': ''}, + 'Version': '18.01.0-ce' + } + + return status_code, response + + +def get_fake_info(): + status_code = 200 + response = {'Containers': 1, 'Images': 1, 'Debug': False, + 'MemoryLimit': False, 'SwapLimit': False, + 'IPv4Forwarding': True} + return status_code, response + + +def post_fake_auth(): + status_code = 200 + response = {'Status': 'Login Succeeded', + 'IdentityToken': '9cbaf023786cd7'} + return status_code, response + + +def get_fake_ping(): + return 200, "OK" + + +def get_fake_search(): + status_code = 200 + response = [{'Name': 'busybox', 'Description': 'Fake Description'}] + return status_code, response + + +def get_fake_images(): + status_code = 200 + response = [{ + 'Id': FAKE_IMAGE_ID, + 'Created': '2 days ago', + 'Repository': 'busybox', + 'RepoTags': ['busybox:latest', 'busybox:1.0'], + }] + return status_code, response + + +def get_fake_image_history(): + status_code = 200 + response = [ + { + "Id": "b750fe79269d", + "Created": 1364102658, + "CreatedBy": "/bin/bash" + }, + { + "Id": "27cf78414709", + "Created": 1364068391, + "CreatedBy": "" + } + ] + + return status_code, response + + +def post_fake_import_image(): + status_code = 200 + response = 'Import messages...' + + return status_code, response + + +def get_fake_containers(): + status_code = 200 + response = [{ + 'Id': FAKE_CONTAINER_ID, + 'Image': 'busybox:latest', + 'Created': '2 days ago', + 'Command': 'true', + 'Status': 'fake status' + }] + return status_code, response + + +def post_fake_start_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_resize_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_create_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def get_fake_inspect_container(tty=False): + status_code = 200 + response = { + 'Id': FAKE_CONTAINER_ID, + 'Config': {'Labels': {'foo': 'bar'}, 'Privileged': True, 'Tty': tty}, + 'ID': FAKE_CONTAINER_ID, + 'Image': 'busybox:latest', + 'Name': 'foobar', + "State": { + "Status": "running", + "Running": True, + "Pid": 0, + "ExitCode": 0, + "StartedAt": "2013-09-25T14:01:18.869545111+02:00", + "Ghost": False + }, + "HostConfig": { + "LogConfig": { + "Type": "json-file", + "Config": {} + }, + }, + "MacAddress": "02:42:ac:11:00:0a" + } + return status_code, response + + +def get_fake_inspect_image(): + status_code = 200 + response = { + 'Id': FAKE_IMAGE_ID, + 'Parent': "27cf784147099545", + 'Created': "2013-03-23T22:24:18.818426-07:00", + 'Container': FAKE_CONTAINER_ID, + 'Config': {'Labels': {'bar': 'foo'}}, + 'ContainerConfig': + { + "Hostname": "", + "User": "", + "Memory": 0, + "MemorySwap": 0, + "AttachStdin": False, + "AttachStdout": False, + "AttachStderr": False, + "PortSpecs": "", + "Tty": True, + "OpenStdin": True, + "StdinOnce": False, + "Env": "", + "Cmd": ["/bin/bash"], + "Dns": "", + "Image": "base", + "Volumes": "", + "VolumesFrom": "", + "WorkingDir": "" + }, + 'Size': 6823592 + } + return status_code, response + + +def get_fake_insert_image(): + status_code = 200 + response = {'StatusCode': 0} + return status_code, response + + +def get_fake_wait(): + status_code = 200 + response = {'StatusCode': 0} + return status_code, response + + +def get_fake_logs(): + status_code = 200 + response = (b'\x01\x00\x00\x00\x00\x00\x00\x00' + b'\x02\x00\x00\x00\x00\x00\x00\x00' + b'\x01\x00\x00\x00\x00\x00\x00\x11Flowering Nights\n' + b'\x01\x00\x00\x00\x00\x00\x00\x10(Sakuya Iyazoi)\n') + return status_code, response + + +def get_fake_diff(): + status_code = 200 + response = [{'Path': '/test', 'Kind': 1}] + return status_code, response + + +def get_fake_events(): + status_code = 200 + response = [{'status': 'stop', 'id': FAKE_CONTAINER_ID, + 'from': FAKE_IMAGE_ID, 'time': 1423247867}] + return status_code, response + + +def get_fake_export(): + status_code = 200 + response = 'Byte Stream....' + return status_code, response + + +def post_fake_exec_create(): + status_code = 200 + response = {'Id': FAKE_EXEC_ID} + return status_code, response + + +def post_fake_exec_start(): + status_code = 200 + response = (b'\x01\x00\x00\x00\x00\x00\x00\x11bin\nboot\ndev\netc\n' + b'\x01\x00\x00\x00\x00\x00\x00\x12lib\nmnt\nproc\nroot\n' + b'\x01\x00\x00\x00\x00\x00\x00\x0csbin\nusr\nvar\n') + return status_code, response + + +def post_fake_exec_resize(): + status_code = 201 + return status_code, '' + + +def get_fake_exec_inspect(): + return 200, { + 'OpenStderr': True, + 'OpenStdout': True, + 'Container': get_fake_inspect_container()[1], + 'Running': False, + 'ProcessConfig': { + 'arguments': ['hello world'], + 'tty': False, + 'entrypoint': 'echo', + 'privileged': False, + 'user': '' + }, + 'ExitCode': 0, + 'ID': FAKE_EXEC_ID, + 'OpenStdin': False + } + + +def post_fake_stop_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_kill_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_pause_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_unpause_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_restart_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_rename_container(): + status_code = 204 + return status_code, None + + +def delete_fake_remove_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_image_create(): + status_code = 200 + response = {'Id': FAKE_IMAGE_ID} + return status_code, response + + +def delete_fake_remove_image(): + status_code = 200 + response = {'Id': FAKE_IMAGE_ID} + return status_code, response + + +def get_fake_get_image(): + status_code = 200 + response = 'Byte Stream....' + return status_code, response + + +def post_fake_load_image(): + status_code = 200 + response = {'Id': FAKE_IMAGE_ID} + return status_code, response + + +def post_fake_commit(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_push(): + status_code = 200 + response = {'Id': FAKE_IMAGE_ID} + return status_code, response + + +def post_fake_build_container(): + status_code = 200 + response = {'Id': FAKE_CONTAINER_ID} + return status_code, response + + +def post_fake_tag_image(): + status_code = 200 + response = {'Id': FAKE_IMAGE_ID} + return status_code, response + + +def get_fake_stats(): + status_code = 200 + response = fake_stat.OBJ + return status_code, response + + +def get_fake_top(): + return 200, { + 'Processes': [ + [ + 'root', + '26501', + '6907', + '0', + '10:32', + 'pts/55', + '00:00:00', + 'sleep 60', + ], + ], + 'Titles': [ + 'UID', + 'PID', + 'PPID', + 'C', + 'STIME', + 'TTY', + 'TIME', + 'CMD', + ], + } + + +def get_fake_volume_list(): + status_code = 200 + response = { + 'Volumes': [ + { + 'Name': 'perfectcherryblossom', + 'Driver': 'local', + 'Mountpoint': '/var/lib/docker/volumes/perfectcherryblossom', + 'Scope': 'local' + }, { + 'Name': 'subterraneananimism', + 'Driver': 'local', + 'Mountpoint': '/var/lib/docker/volumes/subterraneananimism', + 'Scope': 'local' + } + ] + } + return status_code, response + + +def get_fake_volume(): + status_code = 200 + response = { + 'Name': 'perfectcherryblossom', + 'Driver': 'local', + 'Mountpoint': '/var/lib/docker/volumes/perfectcherryblossom', + 'Labels': { + 'com.example.some-label': 'some-value' + }, + 'Scope': 'local' + } + return status_code, response + + +def fake_remove_volume(): + return 204, None + + +def post_fake_update_container(): + return 200, {'Warnings': []} + + +def post_fake_update_node(): + return 200, None + + +def post_fake_join_swarm(): + return 200, None + + +def get_fake_network_list(): + return 200, [{ + "Name": "bridge", + "Id": FAKE_NETWORK_ID, + "Scope": "local", + "Driver": "bridge", + "EnableIPv6": False, + "Internal": False, + "IPAM": { + "Driver": "default", + "Config": [ + { + "Subnet": "172.17.0.0/16" + } + ] + }, + "Containers": { + FAKE_CONTAINER_ID: { + "EndpointID": "ed2419a97c1d99", + "MacAddress": "02:42:ac:11:00:02", + "IPv4Address": "172.17.0.2/16", + "IPv6Address": "" + } + }, + "Options": { + "com.docker.network.bridge.default_bridge": "true", + "com.docker.network.bridge.enable_icc": "true", + "com.docker.network.bridge.enable_ip_masquerade": "true", + "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0", + "com.docker.network.bridge.name": "docker0", + "com.docker.network.driver.mtu": "1500" + } + }] + + +def get_fake_network(): + return 200, get_fake_network_list()[1][0] + + +def post_fake_network(): + return 201, {"Id": FAKE_NETWORK_ID, "Warnings": []} + + +def delete_fake_network(): + return 204, None + + +def post_fake_network_connect(): + return 200, None + + +def post_fake_network_disconnect(): + return 200, None + + +def post_fake_secret(): + status_code = 200 + response = {'ID': FAKE_SECRET_ID} + return status_code, response + + +# Maps real api url to fake response callback +prefix = 'http+docker://localhost' +if constants.IS_WINDOWS_PLATFORM: + prefix = 'http+docker://localnpipe' + +fake_responses = { + '{prefix}/version'.format(prefix=prefix): + get_fake_version, + '{prefix}/{CURRENT_VERSION}/version'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_version, + '{prefix}/{CURRENT_VERSION}/info'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_info, + '{prefix}/{CURRENT_VERSION}/auth'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_auth, + '{prefix}/{CURRENT_VERSION}/_ping'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_ping, + '{prefix}/{CURRENT_VERSION}/images/search'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_search, + '{prefix}/{CURRENT_VERSION}/images/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_images, + '{prefix}/{CURRENT_VERSION}/images/test_image/history'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_image_history, + '{prefix}/{CURRENT_VERSION}/images/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_import_image, + '{prefix}/{CURRENT_VERSION}/containers/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_containers, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/start'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_start_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/resize'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_resize_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_inspect_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/rename'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_rename_container, + '{prefix}/{CURRENT_VERSION}/images/e9aa60c60128/tag'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_tag_image, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/wait'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_wait, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/logs'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_logs, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/changes'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_diff, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/export'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_export, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/update'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_update_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/exec'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_exec_create, + '{prefix}/{CURRENT_VERSION}/exec/d5d177f121dc/start'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_exec_start, + '{prefix}/{CURRENT_VERSION}/exec/d5d177f121dc/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_exec_inspect, + '{prefix}/{CURRENT_VERSION}/exec/d5d177f121dc/resize'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_exec_resize, + + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/stats'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_stats, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/top'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_top, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/stop'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_stop_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/kill'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_kill_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/pause'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_pause_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/unpause'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_unpause_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/restart'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_restart_container, + '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + delete_fake_remove_container, + '{prefix}/{CURRENT_VERSION}/images/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_image_create, + '{prefix}/{CURRENT_VERSION}/images/e9aa60c60128'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + delete_fake_remove_image, + '{prefix}/{CURRENT_VERSION}/images/e9aa60c60128/get'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_get_image, + '{prefix}/{CURRENT_VERSION}/images/load'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_load_image, + '{prefix}/{CURRENT_VERSION}/images/test_image/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_inspect_image, + '{prefix}/{CURRENT_VERSION}/images/test_image/insert'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_insert_image, + '{prefix}/{CURRENT_VERSION}/images/test_image/push'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_push, + '{prefix}/{CURRENT_VERSION}/commit'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_commit, + '{prefix}/{CURRENT_VERSION}/containers/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_create_container, + '{prefix}/{CURRENT_VERSION}/build'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_build_container, + '{prefix}/{CURRENT_VERSION}/events'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + get_fake_events, + ('{prefix}/{CURRENT_VERSION}/volumes'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'GET'): + get_fake_volume_list, + ('{prefix}/{CURRENT_VERSION}/volumes/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'POST'): + get_fake_volume, + ('{1}/{0}/volumes/{2}'.format( + CURRENT_VERSION, prefix, FAKE_VOLUME_NAME + ), 'GET'): + get_fake_volume, + ('{1}/{0}/volumes/{2}'.format( + CURRENT_VERSION, prefix, FAKE_VOLUME_NAME + ), 'DELETE'): + fake_remove_volume, + ('{1}/{0}/nodes/{2}/update?version=1'.format( + CURRENT_VERSION, prefix, FAKE_NODE_ID + ), 'POST'): + post_fake_update_node, + ('{prefix}/{CURRENT_VERSION}/swarm/join'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'POST'): + post_fake_join_swarm, + ('{prefix}/{CURRENT_VERSION}/networks'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'GET'): + get_fake_network_list, + ('{prefix}/{CURRENT_VERSION}/networks/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'POST'): + post_fake_network, + ('{1}/{0}/networks/{2}'.format( + CURRENT_VERSION, prefix, FAKE_NETWORK_ID + ), 'GET'): + get_fake_network, + ('{1}/{0}/networks/{2}'.format( + CURRENT_VERSION, prefix, FAKE_NETWORK_ID + ), 'DELETE'): + delete_fake_network, + ('{1}/{0}/networks/{2}/connect'.format( + CURRENT_VERSION, prefix, FAKE_NETWORK_ID + ), 'POST'): + post_fake_network_connect, + ('{1}/{0}/networks/{2}/disconnect'.format( + CURRENT_VERSION, prefix, FAKE_NETWORK_ID + ), 'POST'): + post_fake_network_disconnect, + '{prefix}/{CURRENT_VERSION}/secrets/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION): + post_fake_secret, +} diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py new file mode 100644 index 00000000..97547328 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +OBJ = { + "read": "2015-02-11T19:20:46.667237763+02:00", + "network": { + "rx_bytes": 567224, + "rx_packets": 3773, + "rx_errors": 0, + "rx_dropped": 0, + "tx_bytes": 1176, + "tx_packets": 13, + "tx_errors": 0, + "tx_dropped": 0 + }, + "cpu_stats": { + "cpu_usage": { + "total_usage": 157260874053, + "percpu_usage": [ + 52196306950, + 24118413549, + 53292684398, + 27653469156 + ], + "usage_in_kernelmode": 37140000000, + "usage_in_usermode": 62140000000 + }, + "system_cpu_usage": 3.0881377e+14, + "throttling_data": { + "periods": 0, + "throttled_periods": 0, + "throttled_time": 0 + } + }, + "memory_stats": { + "usage": 179314688, + "max_usage": 258166784, + "stats": { + "active_anon": 90804224, + "active_file": 2195456, + "cache": 3096576, + "hierarchical_memory_limit": 1.844674407371e+19, + "inactive_anon": 85516288, + "inactive_file": 798720, + "mapped_file": 2646016, + "pgfault": 101034, + "pgmajfault": 1207, + "pgpgin": 115814, + "pgpgout": 75613, + "rss": 176218112, + "rss_huge": 12582912, + "total_active_anon": 90804224, + "total_active_file": 2195456, + "total_cache": 3096576, + "total_inactive_anon": 85516288, + "total_inactive_file": 798720, + "total_mapped_file": 2646016, + "total_pgfault": 101034, + "total_pgmajfault": 1207, + "total_pgpgin": 115814, + "total_pgpgout": 75613, + "total_rss": 176218112, + "total_rss_huge": 12582912, + "total_unevictable": 0, + "total_writeback": 0, + "unevictable": 0, + "writeback": 0 + }, + "failcnt": 0, + "limit": 8039038976 + }, + "blkio_stats": { + "io_service_bytes_recursive": [ + { + "major": 8, + "minor": 0, + "op": "Read", + "value": 72843264 + }, { + "major": 8, + "minor": 0, + "op": "Write", + "value": 4096 + }, { + "major": 8, + "minor": 0, + "op": "Sync", + "value": 4096 + }, { + "major": 8, + "minor": 0, + "op": "Async", + "value": 72843264 + }, { + "major": 8, + "minor": 0, + "op": "Total", + "value": 72847360 + } + ], + "io_serviced_recursive": [ + { + "major": 8, + "minor": 0, + "op": "Read", + "value": 10581 + }, { + "major": 8, + "minor": 0, + "op": "Write", + "value": 1 + }, { + "major": 8, + "minor": 0, + "op": "Sync", + "value": 1 + }, { + "major": 8, + "minor": 0, + "op": "Async", + "value": 10581 + }, { + "major": 8, + "minor": 0, + "op": "Total", + "value": 10582 + } + ], + "io_queue_recursive": [], + "io_service_time_recursive": [], + "io_wait_time_recursive": [], + "io_merged_recursive": [], + "io_time_recursive": [], + "sectors_recursive": [] + } +} diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py new file mode 100644 index 00000000..b3b5a351 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py @@ -0,0 +1,819 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import base64 +import json +import os +import os.path +import random +import shutil +import tempfile +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api import auth, errors +from ansible_collections.community.docker.plugins.module_utils._api.credentials.errors import CredentialsNotFound +from ansible_collections.community.docker.plugins.module_utils._api.credentials.store import Store + +try: + from unittest import mock +except ImportError: + import mock + + +class RegressionTest(unittest.TestCase): + def test_803_urlsafe_encode(self): + auth_data = { + 'username': 'root', + 'password': 'GR?XGR?XGR?XGR?X' + } + encoded = auth.encode_header(auth_data) + assert b'/' not in encoded + assert b'_' in encoded + + +class ResolveRepositoryNameTest(unittest.TestCase): + def test_resolve_repository_name_hub_library_image(self): + assert auth.resolve_repository_name('image') == ( + 'docker.io', 'image' + ) + + def test_resolve_repository_name_dotted_hub_library_image(self): + assert auth.resolve_repository_name('image.valid') == ( + 'docker.io', 'image.valid' + ) + + def test_resolve_repository_name_hub_image(self): + assert auth.resolve_repository_name('username/image') == ( + 'docker.io', 'username/image' + ) + + def test_explicit_hub_index_library_image(self): + assert auth.resolve_repository_name('docker.io/image') == ( + 'docker.io', 'image' + ) + + def test_explicit_legacy_hub_index_library_image(self): + assert auth.resolve_repository_name('index.docker.io/image') == ( + 'docker.io', 'image' + ) + + def test_resolve_repository_name_private_registry(self): + assert auth.resolve_repository_name('my.registry.net/image') == ( + 'my.registry.net', 'image' + ) + + def test_resolve_repository_name_private_registry_with_port(self): + assert auth.resolve_repository_name('my.registry.net:5000/image') == ( + 'my.registry.net:5000', 'image' + ) + + def test_resolve_repository_name_private_registry_with_username(self): + assert auth.resolve_repository_name( + 'my.registry.net/username/image' + ) == ('my.registry.net', 'username/image') + + def test_resolve_repository_name_no_dots_but_port(self): + assert auth.resolve_repository_name('hostname:5000/image') == ( + 'hostname:5000', 'image' + ) + + def test_resolve_repository_name_no_dots_but_port_and_username(self): + assert auth.resolve_repository_name( + 'hostname:5000/username/image' + ) == ('hostname:5000', 'username/image') + + def test_resolve_repository_name_localhost(self): + assert auth.resolve_repository_name('localhost/image') == ( + 'localhost', 'image' + ) + + def test_resolve_repository_name_localhost_with_username(self): + assert auth.resolve_repository_name('localhost/username/image') == ( + 'localhost', 'username/image' + ) + + def test_invalid_index_name(self): + with pytest.raises(errors.InvalidRepository): + auth.resolve_repository_name('-gecko.com/image') + + +def encode_auth(auth_info): + return base64.b64encode( + auth_info.get('username', '').encode('utf-8') + b':' + + auth_info.get('password', '').encode('utf-8')) + + +class ResolveAuthTest(unittest.TestCase): + index_config = {'auth': encode_auth({'username': 'indexuser'})} + private_config = {'auth': encode_auth({'username': 'privateuser'})} + legacy_config = {'auth': encode_auth({'username': 'legacyauth'})} + + auth_config = auth.AuthConfig({ + 'auths': auth.parse_auth({ + 'https://index.docker.io/v1/': index_config, + 'my.registry.net': private_config, + 'http://legacy.registry.url/v1/': legacy_config, + }) + }) + + def test_resolve_authconfig_hostname_only(self): + assert auth.resolve_authconfig( + self.auth_config, 'my.registry.net' + )['username'] == 'privateuser' + + def test_resolve_authconfig_no_protocol(self): + assert auth.resolve_authconfig( + self.auth_config, 'my.registry.net/v1/' + )['username'] == 'privateuser' + + def test_resolve_authconfig_no_path(self): + assert auth.resolve_authconfig( + self.auth_config, 'http://my.registry.net' + )['username'] == 'privateuser' + + def test_resolve_authconfig_no_path_trailing_slash(self): + assert auth.resolve_authconfig( + self.auth_config, 'http://my.registry.net/' + )['username'] == 'privateuser' + + def test_resolve_authconfig_no_path_wrong_secure_proto(self): + assert auth.resolve_authconfig( + self.auth_config, 'https://my.registry.net' + )['username'] == 'privateuser' + + def test_resolve_authconfig_no_path_wrong_insecure_proto(self): + assert auth.resolve_authconfig( + self.auth_config, 'http://index.docker.io' + )['username'] == 'indexuser' + + def test_resolve_authconfig_path_wrong_proto(self): + assert auth.resolve_authconfig( + self.auth_config, 'https://my.registry.net/v1/' + )['username'] == 'privateuser' + + def test_resolve_authconfig_default_registry(self): + assert auth.resolve_authconfig( + self.auth_config + )['username'] == 'indexuser' + + def test_resolve_authconfig_default_explicit_none(self): + assert auth.resolve_authconfig( + self.auth_config, None + )['username'] == 'indexuser' + + def test_resolve_authconfig_fully_explicit(self): + assert auth.resolve_authconfig( + self.auth_config, 'http://my.registry.net/v1/' + )['username'] == 'privateuser' + + def test_resolve_authconfig_legacy_config(self): + assert auth.resolve_authconfig( + self.auth_config, 'legacy.registry.url' + )['username'] == 'legacyauth' + + def test_resolve_authconfig_no_match(self): + assert auth.resolve_authconfig( + self.auth_config, 'does.not.exist' + ) is None + + def test_resolve_registry_and_auth_library_image(self): + image = 'image' + assert auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + )['username'] == 'indexuser' + + def test_resolve_registry_and_auth_hub_image(self): + image = 'username/image' + assert auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + )['username'] == 'indexuser' + + def test_resolve_registry_and_auth_explicit_hub(self): + image = 'docker.io/username/image' + assert auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + )['username'] == 'indexuser' + + def test_resolve_registry_and_auth_explicit_legacy_hub(self): + image = 'index.docker.io/username/image' + assert auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + )['username'] == 'indexuser' + + def test_resolve_registry_and_auth_private_registry(self): + image = 'my.registry.net/image' + assert auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + )['username'] == 'privateuser' + + def test_resolve_registry_and_auth_unauthenticated_registry(self): + image = 'other.registry.net/image' + assert auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + ) is None + + def test_resolve_auth_with_empty_credstore_and_auth_dict(self): + auth_config = auth.AuthConfig({ + 'auths': auth.parse_auth({ + 'https://index.docker.io/v1/': self.index_config, + }), + 'credsStore': 'blackbox' + }) + with mock.patch( + 'ansible_collections.community.docker.plugins.module_utils._api.auth.AuthConfig._resolve_authconfig_credstore' + ) as m: + m.return_value = None + assert 'indexuser' == auth.resolve_authconfig( + auth_config, None + )['username'] + + +class LoadConfigTest(unittest.TestCase): + def test_load_config_no_file(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg = auth.load_config(folder) + assert cfg is not None + + def test_load_legacy_config(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg_path = os.path.join(folder, '.dockercfg') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + with open(cfg_path, 'w') as f: + f.write('auth = {auth}\n'.format(auth=auth_)) + f.write('email = sakuya@scarlet.net') + + cfg = auth.load_config(cfg_path) + assert auth.resolve_authconfig(cfg) is not None + assert cfg.auths[auth.INDEX_NAME] is not None + cfg = cfg.auths[auth.INDEX_NAME] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == 'sakuya@scarlet.net' + assert cfg.get('Auth') is None + + def test_load_json_config(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg_path = os.path.join(folder, '.dockercfg') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + email = 'sakuya@scarlet.net' + with open(cfg_path, 'w') as f: + json.dump( + {auth.INDEX_URL: {'auth': auth_, 'email': email}}, f + ) + cfg = auth.load_config(cfg_path) + assert auth.resolve_authconfig(cfg) is not None + assert cfg.auths[auth.INDEX_URL] is not None + cfg = cfg.auths[auth.INDEX_URL] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == email + assert cfg.get('Auth') is None + + def test_load_modern_json_config(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg_path = os.path.join(folder, 'config.json') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + email = 'sakuya@scarlet.net' + with open(cfg_path, 'w') as f: + json.dump({ + 'auths': { + auth.INDEX_URL: { + 'auth': auth_, 'email': email + } + } + }, f) + cfg = auth.load_config(cfg_path) + assert auth.resolve_authconfig(cfg) is not None + assert cfg.auths[auth.INDEX_URL] is not None + cfg = cfg.auths[auth.INDEX_URL] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == email + + def test_load_config_with_random_name(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, + '.{0}.dockercfg'.format( + random.randrange(100000))) + registry = 'https://your.private.registry.io' + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + config = { + registry: { + 'auth': '{auth}'.format(auth=auth_), + 'email': 'sakuya@scarlet.net' + } + } + + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + cfg = auth.load_config(dockercfg_path).auths + assert registry in cfg + assert cfg[registry] is not None + cfg = cfg[registry] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == 'sakuya@scarlet.net' + assert cfg.get('auth') is None + + def test_load_config_custom_config_env(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, 'config.json') + registry = 'https://your.private.registry.io' + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + config = { + registry: { + 'auth': '{auth}'.format(auth=auth_), + 'email': 'sakuya@scarlet.net' + } + } + + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): + cfg = auth.load_config(None).auths + assert registry in cfg + assert cfg[registry] is not None + cfg = cfg[registry] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == 'sakuya@scarlet.net' + assert cfg.get('auth') is None + + def test_load_config_custom_config_env_with_auths(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, 'config.json') + registry = 'https://your.private.registry.io' + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + config = { + 'auths': { + registry: { + 'auth': '{auth}'.format(auth=auth_), + 'email': 'sakuya@scarlet.net' + } + } + } + + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): + cfg = auth.load_config(None) + assert registry in cfg.auths + cfg = cfg.auths[registry] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == 'sakuya@scarlet.net' + assert cfg.get('auth') is None + + def test_load_config_custom_config_env_utf8(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, 'config.json') + registry = 'https://your.private.registry.io' + auth_ = base64.b64encode( + b'sakuya\xc3\xa6:izayoi\xc3\xa6').decode('ascii') + config = { + 'auths': { + registry: { + 'auth': '{auth}'.format(auth=auth_), + 'email': 'sakuya@scarlet.net' + } + } + } + + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): + cfg = auth.load_config(None) + assert registry in cfg.auths + cfg = cfg.auths[registry] + assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8') + assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8') + assert cfg['email'] == 'sakuya@scarlet.net' + assert cfg.get('auth') is None + + def test_load_config_unknown_keys(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + dockercfg_path = os.path.join(folder, 'config.json') + config = { + 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i' + } + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + cfg = auth.load_config(dockercfg_path) + assert dict(cfg) == {'auths': {}} + + def test_load_config_invalid_auth_dict(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + dockercfg_path = os.path.join(folder, 'config.json') + config = { + 'auths': { + 'scarlet.net': {'sakuya': 'izayoi'} + } + } + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + cfg = auth.load_config(dockercfg_path) + assert dict(cfg) == {'auths': {'scarlet.net': {}}} + + def test_load_config_identity_token(self): + folder = tempfile.mkdtemp() + registry = 'scarlet.net' + token = '1ce1cebb-503e-7043-11aa-7feb8bd4a1ce' + self.addCleanup(shutil.rmtree, folder) + dockercfg_path = os.path.join(folder, 'config.json') + auth_entry = encode_auth({'username': 'sakuya'}).decode('ascii') + config = { + 'auths': { + registry: { + 'auth': auth_entry, + 'identitytoken': token + } + } + } + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + cfg = auth.load_config(dockercfg_path) + assert registry in cfg.auths + cfg = cfg.auths[registry] + assert 'IdentityToken' in cfg + assert cfg['IdentityToken'] == token + + +class CredstoreTest(unittest.TestCase): + def setUp(self): + self.authconfig = auth.AuthConfig({'credsStore': 'default'}) + self.default_store = InMemoryStore('default') + self.authconfig._stores['default'] = self.default_store + self.default_store.store( + 'https://gensokyo.jp/v2', 'sakuya', 'izayoi', + ) + self.default_store.store( + 'https://default.com/v2', 'user', 'hunter2', + ) + + def test_get_credential_store(self): + auth_config = auth.AuthConfig({ + 'credHelpers': { + 'registry1.io': 'truesecret', + 'registry2.io': 'powerlock' + }, + 'credsStore': 'blackbox', + }) + + assert auth_config.get_credential_store('registry1.io') == 'truesecret' + assert auth_config.get_credential_store('registry2.io') == 'powerlock' + assert auth_config.get_credential_store('registry3.io') == 'blackbox' + + def test_get_credential_store_no_default(self): + auth_config = auth.AuthConfig({ + 'credHelpers': { + 'registry1.io': 'truesecret', + 'registry2.io': 'powerlock' + }, + }) + assert auth_config.get_credential_store('registry2.io') == 'powerlock' + assert auth_config.get_credential_store('registry3.io') is None + + def test_get_credential_store_default_index(self): + auth_config = auth.AuthConfig({ + 'credHelpers': { + 'https://index.docker.io/v1/': 'powerlock' + }, + 'credsStore': 'truesecret' + }) + + assert auth_config.get_credential_store(None) == 'powerlock' + assert auth_config.get_credential_store('docker.io') == 'powerlock' + assert auth_config.get_credential_store('images.io') == 'truesecret' + + def test_get_credential_store_with_plain_dict(self): + auth_config = { + 'credHelpers': { + 'registry1.io': 'truesecret', + 'registry2.io': 'powerlock' + }, + 'credsStore': 'blackbox', + } + + assert auth.get_credential_store( + auth_config, 'registry1.io' + ) == 'truesecret' + assert auth.get_credential_store( + auth_config, 'registry2.io' + ) == 'powerlock' + assert auth.get_credential_store( + auth_config, 'registry3.io' + ) == 'blackbox' + + def test_get_all_credentials_credstore_only(self): + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + } + + def test_get_all_credentials_with_empty_credhelper(self): + self.authconfig['credHelpers'] = { + 'registry1.io': 'truesecret', + } + self.authconfig._stores['truesecret'] = InMemoryStore() + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'registry1.io': None, + } + + def test_get_all_credentials_with_credhelpers_only(self): + del self.authconfig['credsStore'] + assert self.authconfig.get_all_credentials() == {} + + self.authconfig['credHelpers'] = { + 'https://gensokyo.jp/v2': 'default', + 'https://default.com/v2': 'default', + } + + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + } + + def test_get_all_credentials_with_auths_entries(self): + self.authconfig.add_auth('registry1.io', { + 'ServerAddress': 'registry1.io', + 'Username': 'reimu', + 'Password': 'hakurei', + }) + + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'registry1.io': { + 'ServerAddress': 'registry1.io', + 'Username': 'reimu', + 'Password': 'hakurei', + }, + } + + def test_get_all_credentials_with_empty_auths_entry(self): + self.authconfig.add_auth('default.com', {}) + + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + } + + def test_get_all_credentials_credstore_overrides_auth_entry(self): + self.authconfig.add_auth('default.com', { + 'Username': 'shouldnotsee', + 'Password': 'thisentry', + 'ServerAddress': 'https://default.com/v2', + }) + + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + } + + def test_get_all_credentials_helpers_override_default(self): + self.authconfig['credHelpers'] = { + 'https://default.com/v2': 'truesecret', + } + truesecret = InMemoryStore('truesecret') + truesecret.store('https://default.com/v2', 'reimu', 'hakurei') + self.authconfig._stores['truesecret'] = truesecret + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'reimu', + 'Password': 'hakurei', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'reimu', + 'Password': 'hakurei', + 'ServerAddress': 'https://default.com/v2', + }, + } + + def test_get_all_credentials_3_sources(self): + self.authconfig['credHelpers'] = { + 'registry1.io': 'truesecret', + } + truesecret = InMemoryStore('truesecret') + truesecret.store('registry1.io', 'reimu', 'hakurei') + self.authconfig._stores['truesecret'] = truesecret + self.authconfig.add_auth('registry2.io', { + 'ServerAddress': 'registry2.io', + 'Username': 'reimu', + 'Password': 'hakurei', + }) + + assert self.authconfig.get_all_credentials() == { + 'https://gensokyo.jp/v2': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'gensokyo.jp': { + 'Username': 'sakuya', + 'Password': 'izayoi', + 'ServerAddress': 'https://gensokyo.jp/v2', + }, + 'https://default.com/v2': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'default.com': { + 'Username': 'user', + 'Password': 'hunter2', + 'ServerAddress': 'https://default.com/v2', + }, + 'registry1.io': { + 'ServerAddress': 'registry1.io', + 'Username': 'reimu', + 'Password': 'hakurei', + }, + 'registry2.io': { + 'ServerAddress': 'registry2.io', + 'Username': 'reimu', + 'Password': 'hakurei', + } + } + + +class InMemoryStore(Store): + def __init__(self, *args, **kwargs): + self.__store = {} + + def get(self, server): + try: + return self.__store[server] + except KeyError: + raise CredentialsNotFound() + + def store(self, server, username, secret): + self.__store[server] = { + 'ServerURL': server, + 'Username': username, + 'Secret': secret, + } + + def list(self): + return dict( + (k, v['Username']) for k, v in self.__store.items() + ) + + def erase(self, server): + del self.__store[server] diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py new file mode 100644 index 00000000..2cc114ed --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest +import sys + +import pytest +import requests + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.errors import ( + APIError, DockerException, + create_unexpected_kwargs_error, + create_api_error_from_http_exception, +) + + +class APIErrorTest(unittest.TestCase): + def test_api_error_is_caught_by_dockerexception(self): + try: + raise APIError("this should be caught by DockerException") + except DockerException: + pass + + def test_status_code_200(self): + """The status_code property is present with 200 response.""" + resp = requests.Response() + resp.status_code = 200 + err = APIError('', response=resp) + assert err.status_code == 200 + + def test_status_code_400(self): + """The status_code property is present with 400 response.""" + resp = requests.Response() + resp.status_code = 400 + err = APIError('', response=resp) + assert err.status_code == 400 + + def test_status_code_500(self): + """The status_code property is present with 500 response.""" + resp = requests.Response() + resp.status_code = 500 + err = APIError('', response=resp) + assert err.status_code == 500 + + def test_is_server_error_200(self): + """Report not server error on 200 response.""" + resp = requests.Response() + resp.status_code = 200 + err = APIError('', response=resp) + assert err.is_server_error() is False + + def test_is_server_error_300(self): + """Report not server error on 300 response.""" + resp = requests.Response() + resp.status_code = 300 + err = APIError('', response=resp) + assert err.is_server_error() is False + + def test_is_server_error_400(self): + """Report not server error on 400 response.""" + resp = requests.Response() + resp.status_code = 400 + err = APIError('', response=resp) + assert err.is_server_error() is False + + def test_is_server_error_500(self): + """Report server error on 500 response.""" + resp = requests.Response() + resp.status_code = 500 + err = APIError('', response=resp) + assert err.is_server_error() is True + + def test_is_client_error_500(self): + """Report not client error on 500 response.""" + resp = requests.Response() + resp.status_code = 500 + err = APIError('', response=resp) + assert err.is_client_error() is False + + def test_is_client_error_400(self): + """Report client error on 400 response.""" + resp = requests.Response() + resp.status_code = 400 + err = APIError('', response=resp) + assert err.is_client_error() is True + + def test_is_error_300(self): + """Report no error on 300 response.""" + resp = requests.Response() + resp.status_code = 300 + err = APIError('', response=resp) + assert err.is_error() is False + + def test_is_error_400(self): + """Report error on 400 response.""" + resp = requests.Response() + resp.status_code = 400 + err = APIError('', response=resp) + assert err.is_error() is True + + def test_is_error_500(self): + """Report error on 500 response.""" + resp = requests.Response() + resp.status_code = 500 + err = APIError('', response=resp) + assert err.is_error() is True + + def test_create_error_from_exception(self): + resp = requests.Response() + resp.status_code = 500 + err = APIError('') + try: + resp.raise_for_status() + except requests.exceptions.HTTPError as e: + try: + create_api_error_from_http_exception(e) + except APIError as e: + err = e + assert err.is_server_error() is True + + +class CreateUnexpectedKwargsErrorTest(unittest.TestCase): + def test_create_unexpected_kwargs_error_single(self): + e = create_unexpected_kwargs_error('f', {'foo': 'bar'}) + assert str(e) == "f() got an unexpected keyword argument 'foo'" + + def test_create_unexpected_kwargs_error_multiple(self): + e = create_unexpected_kwargs_error('f', {'foo': 'bar', 'baz': 'bosh'}) + assert str(e) == "f() got unexpected keyword arguments 'baz', 'foo'" diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py new file mode 100644 index 00000000..e9189f3e --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.transport.sshconn import SSHSocket, SSHHTTPAdapter + + +class SSHAdapterTest(unittest.TestCase): + @staticmethod + def test_ssh_hostname_prefix_trim(): + conn = SSHHTTPAdapter( + base_url="ssh://user@hostname:1234", shell_out=True) + assert conn.ssh_host == "user@hostname:1234" + + @staticmethod + def test_ssh_parse_url(): + c = SSHSocket(host="user@hostname:1234") + assert c.host == "hostname" + assert c.port == "1234" + assert c.user == "user" + + @staticmethod + def test_ssh_parse_hostname_only(): + c = SSHSocket(host="hostname") + assert c.host == "hostname" + assert c.port is None + assert c.user is None + + @staticmethod + def test_ssh_parse_user_and_hostname(): + c = SSHSocket(host="user@hostname") + assert c.host == "hostname" + assert c.port is None + assert c.user == "user" + + @staticmethod + def test_ssh_parse_hostname_and_port(): + c = SSHSocket(host="hostname:22") + assert c.host == "hostname" + assert c.port == "22" + assert c.user is None diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py new file mode 100644 index 00000000..428163e6 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.transport import ssladapter + +try: + from backports.ssl_match_hostname import ( + match_hostname, CertificateError + ) +except ImportError: + from ssl import ( + match_hostname, CertificateError + ) + +try: + from ssl import OP_NO_SSLv3, OP_NO_SSLv2, OP_NO_TLSv1 +except ImportError: + OP_NO_SSLv2 = 0x1000000 + OP_NO_SSLv3 = 0x2000000 + OP_NO_TLSv1 = 0x4000000 + + +class SSLAdapterTest(unittest.TestCase): + def test_only_uses_tls(self): + ssl_context = ssladapter.urllib3.util.ssl_.create_urllib3_context() + + assert ssl_context.options & OP_NO_SSLv3 + # if OpenSSL is compiled without SSL2 support, OP_NO_SSLv2 will be 0 + assert not bool(OP_NO_SSLv2) or ssl_context.options & OP_NO_SSLv2 + assert not ssl_context.options & OP_NO_TLSv1 + + +class MatchHostnameTest(unittest.TestCase): + cert = { + 'issuer': ( + (('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'San Francisco'),), + (('organizationName', 'Docker Inc'),), + (('organizationalUnitName', 'Docker-Python'),), + (('commonName', 'localhost'),), + (('emailAddress', 'info@docker.com'),) + ), + 'notAfter': 'Mar 25 23:08:23 2030 GMT', + 'notBefore': 'Mar 25 23:08:23 2016 GMT', + 'serialNumber': 'BD5F894C839C548F', + 'subject': ( + (('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'San Francisco'),), + (('organizationName', 'Docker Inc'),), + (('organizationalUnitName', 'Docker-Python'),), + (('commonName', 'localhost'),), + (('emailAddress', 'info@docker.com'),) + ), + 'subjectAltName': ( + ('DNS', 'localhost'), + ('DNS', '*.gensokyo.jp'), + ('IP Address', '127.0.0.1'), + ), + 'version': 3 + } + + def test_match_ip_address_success(self): + assert match_hostname(self.cert, '127.0.0.1') is None + + def test_match_localhost_success(self): + assert match_hostname(self.cert, 'localhost') is None + + def test_match_dns_success(self): + assert match_hostname(self.cert, 'touhou.gensokyo.jp') is None + + def test_match_ip_address_failure(self): + with pytest.raises(CertificateError): + match_hostname(self.cert, '192.168.0.25') + + def test_match_dns_failure(self): + with pytest.raises(CertificateError): + match_hostname(self.cert, 'foobar.co.uk') diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py new file mode 100644 index 00000000..50eb703d --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py @@ -0,0 +1,515 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import os.path +import shutil +import socket +import tarfile +import tempfile +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.constants import IS_WINDOWS_PLATFORM +from ansible_collections.community.docker.plugins.module_utils._api.utils.build import exclude_paths, tar + + +def make_tree(dirs, files): + base = tempfile.mkdtemp() + + for path in dirs: + os.makedirs(os.path.join(base, path)) + + for path in files: + with open(os.path.join(base, path), 'w') as f: + f.write("content") + + return base + + +def convert_paths(collection): + return set(map(convert_path, collection)) + + +def convert_path(path): + return path.replace('/', os.path.sep) + + +class ExcludePathsTest(unittest.TestCase): + dirs = [ + 'foo', + 'foo/bar', + 'bar', + 'target', + 'target/subdir', + 'subdir', + 'subdir/target', + 'subdir/target/subdir', + 'subdir/subdir2', + 'subdir/subdir2/target', + 'subdir/subdir2/target/subdir' + ] + + files = [ + 'Dockerfile', + 'Dockerfile.alt', + '.dockerignore', + 'a.py', + 'a.go', + 'b.py', + 'cde.py', + 'foo/a.py', + 'foo/b.py', + 'foo/bar/a.py', + 'bar/a.py', + 'foo/Dockerfile3', + 'target/file.txt', + 'target/subdir/file.txt', + 'subdir/file.txt', + 'subdir/target/file.txt', + 'subdir/target/subdir/file.txt', + 'subdir/subdir2/file.txt', + 'subdir/subdir2/target/file.txt', + 'subdir/subdir2/target/subdir/file.txt', + ] + + all_paths = set(dirs + files) + + def setUp(self): + self.base = make_tree(self.dirs, self.files) + + def tearDown(self): + shutil.rmtree(self.base) + + def exclude(self, patterns, dockerfile=None): + return set(exclude_paths(self.base, patterns, dockerfile=dockerfile)) + + def test_no_excludes(self): + assert self.exclude(['']) == convert_paths(self.all_paths) + + def test_no_dupes(self): + paths = exclude_paths(self.base, ['!a.py']) + assert sorted(paths) == sorted(set(paths)) + + def test_wildcard_exclude(self): + assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore']) + + def test_exclude_dockerfile_dockerignore(self): + """ + Even if the .dockerignore file explicitly says to exclude + Dockerfile and/or .dockerignore, don't exclude them from + the actual tar file. + """ + assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths( + self.all_paths + ) + + def test_exclude_custom_dockerfile(self): + """ + If we're using a custom Dockerfile, make sure that's not + excluded. + """ + assert self.exclude(['*'], dockerfile='Dockerfile.alt') == set(['Dockerfile.alt', '.dockerignore']) + + assert self.exclude( + ['*'], dockerfile='foo/Dockerfile3' + ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore'])) + + # https://github.com/docker/docker-py/issues/1956 + assert self.exclude( + ['*'], dockerfile='./foo/Dockerfile3' + ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore'])) + + def test_exclude_dockerfile_child(self): + includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3') + assert convert_path('foo/Dockerfile3') in includes + assert convert_path('foo/a.py') not in includes + + def test_single_filename(self): + assert self.exclude(['a.py']) == convert_paths( + self.all_paths - set(['a.py']) + ) + + def test_single_filename_leading_dot_slash(self): + assert self.exclude(['./a.py']) == convert_paths( + self.all_paths - set(['a.py']) + ) + + # As odd as it sounds, a filename pattern with a trailing slash on the + # end *will* result in that file being excluded. + def test_single_filename_trailing_slash(self): + assert self.exclude(['a.py/']) == convert_paths( + self.all_paths - set(['a.py']) + ) + + def test_wildcard_filename_start(self): + assert self.exclude(['*.py']) == convert_paths( + self.all_paths - set(['a.py', 'b.py', 'cde.py']) + ) + + def test_wildcard_with_exception(self): + assert self.exclude(['*.py', '!b.py']) == convert_paths( + self.all_paths - set(['a.py', 'cde.py']) + ) + + def test_wildcard_with_wildcard_exception(self): + assert self.exclude(['*.*', '!*.go']) == convert_paths( + self.all_paths - set([ + 'a.py', 'b.py', 'cde.py', 'Dockerfile.alt', + ]) + ) + + def test_wildcard_filename_end(self): + assert self.exclude(['a.*']) == convert_paths( + self.all_paths - set(['a.py', 'a.go']) + ) + + def test_question_mark(self): + assert self.exclude(['?.py']) == convert_paths( + self.all_paths - set(['a.py', 'b.py']) + ) + + def test_single_subdir_single_filename(self): + assert self.exclude(['foo/a.py']) == convert_paths( + self.all_paths - set(['foo/a.py']) + ) + + def test_single_subdir_single_filename_leading_slash(self): + assert self.exclude(['/foo/a.py']) == convert_paths( + self.all_paths - set(['foo/a.py']) + ) + + def test_exclude_include_absolute_path(self): + base = make_tree([], ['a.py', 'b.py']) + assert exclude_paths( + base, + ['/*', '!/*.py'] + ) == set(['a.py', 'b.py']) + + def test_single_subdir_with_path_traversal(self): + assert self.exclude(['foo/whoops/../a.py']) == convert_paths( + self.all_paths - set(['foo/a.py']) + ) + + def test_single_subdir_wildcard_filename(self): + assert self.exclude(['foo/*.py']) == convert_paths( + self.all_paths - set(['foo/a.py', 'foo/b.py']) + ) + + def test_wildcard_subdir_single_filename(self): + assert self.exclude(['*/a.py']) == convert_paths( + self.all_paths - set(['foo/a.py', 'bar/a.py']) + ) + + def test_wildcard_subdir_wildcard_filename(self): + assert self.exclude(['*/*.py']) == convert_paths( + self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py']) + ) + + def test_directory(self): + assert self.exclude(['foo']) == convert_paths( + self.all_paths - set([ + 'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py', + 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_trailing_slash(self): + assert self.exclude(['foo']) == convert_paths( + self.all_paths - set([ + 'foo', 'foo/a.py', 'foo/b.py', + 'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_single_exception(self): + assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths( + self.all_paths - set([ + 'foo/a.py', 'foo/b.py', 'foo', 'foo/bar', + 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_subdir_exception(self): + assert self.exclude(['foo', '!foo/bar']) == convert_paths( + self.all_paths - set([ + 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3' + ]) + ) + + @pytest.mark.skipif( + not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows' + ) + def test_directory_with_subdir_exception_win32_pathsep(self): + assert self.exclude(['foo', '!foo\\bar']) == convert_paths( + self.all_paths - set([ + 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_wildcard_exception(self): + assert self.exclude(['foo', '!foo/*.py']) == convert_paths( + self.all_paths - set([ + 'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3' + ]) + ) + + def test_subdirectory(self): + assert self.exclude(['foo/bar']) == convert_paths( + self.all_paths - set(['foo/bar', 'foo/bar/a.py']) + ) + + @pytest.mark.skipif( + not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows' + ) + def test_subdirectory_win32_pathsep(self): + assert self.exclude(['foo\\bar']) == convert_paths( + self.all_paths - set(['foo/bar', 'foo/bar/a.py']) + ) + + def test_double_wildcard(self): + assert self.exclude(['**/a.py']) == convert_paths( + self.all_paths - set([ + 'a.py', 'foo/a.py', 'foo/bar/a.py', 'bar/a.py' + ]) + ) + + assert self.exclude(['foo/**/bar']) == convert_paths( + self.all_paths - set(['foo/bar', 'foo/bar/a.py']) + ) + + def test_single_and_double_wildcard(self): + assert self.exclude(['**/target/*/*']) == convert_paths( + self.all_paths - set([ + 'target/subdir/file.txt', + 'subdir/target/subdir/file.txt', + 'subdir/subdir2/target/subdir/file.txt', + ]) + ) + + def test_trailing_double_wildcard(self): + assert self.exclude(['subdir/**']) == convert_paths( + self.all_paths - set([ + 'subdir/file.txt', + 'subdir/target/file.txt', + 'subdir/target/subdir/file.txt', + 'subdir/subdir2/file.txt', + 'subdir/subdir2/target/file.txt', + 'subdir/subdir2/target/subdir/file.txt', + 'subdir/target', + 'subdir/target/subdir', + 'subdir/subdir2', + 'subdir/subdir2/target', + 'subdir/subdir2/target/subdir', + ]) + ) + + def test_double_wildcard_with_exception(self): + assert self.exclude(['**', '!bar', '!foo/bar']) == convert_paths( + set([ + 'foo/bar', 'foo/bar/a.py', 'bar', 'bar/a.py', 'Dockerfile', + '.dockerignore', + ]) + ) + + def test_include_wildcard(self): + # This may be surprising but it matches the CLI's behavior + # (tested with 18.05.0-ce on linux) + base = make_tree(['a'], ['a/b.py']) + assert exclude_paths( + base, + ['*', '!*/b.py'] + ) == set() + + def test_last_line_precedence(self): + base = make_tree( + [], + ['garbage.md', + 'trash.md', + 'README.md', + 'README-bis.md', + 'README-secret.md']) + assert exclude_paths( + base, + ['*.md', '!README*.md', 'README-secret.md'] + ) == set(['README.md', 'README-bis.md']) + + def test_parent_directory(self): + base = make_tree( + [], + ['a.py', + 'b.py', + 'c.py']) + # Dockerignore reference stipulates that absolute paths are + # equivalent to relative paths, hence /../foo should be + # equivalent to ../foo. It also stipulates that paths are run + # through Go's filepath.Clean, which explicitly "replace + # "/.." by "/" at the beginning of a path". + assert exclude_paths( + base, + ['../a.py', '/../b.py'] + ) == set(['c.py']) + + +class TarTest(unittest.TestCase): + def test_tar_with_excludes(self): + dirs = [ + 'foo', + 'foo/bar', + 'bar', + ] + + files = [ + 'Dockerfile', + 'Dockerfile.alt', + '.dockerignore', + 'a.py', + 'a.go', + 'b.py', + 'cde.py', + 'foo/a.py', + 'foo/b.py', + 'foo/bar/a.py', + 'bar/a.py', + ] + + exclude = [ + '*.py', + '!b.py', + '!a.go', + 'foo', + 'Dockerfile*', + '.dockerignore', + ] + + expected_names = set([ + 'Dockerfile', + '.dockerignore', + 'a.go', + 'b.py', + 'bar', + 'bar/a.py', + ]) + + base = make_tree(dirs, files) + self.addCleanup(shutil.rmtree, base) + + with tar(base, exclude=exclude) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == sorted(expected_names) + + def test_tar_with_empty_directory(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'foo'] + + @pytest.mark.skipif( + IS_WINDOWS_PLATFORM or os.geteuid() == 0, + reason='root user always has access ; no chmod on Windows' + ) + def test_tar_with_inaccessible_file(self): + base = tempfile.mkdtemp() + full_path = os.path.join(base, 'foo') + self.addCleanup(shutil.rmtree, base) + with open(full_path, 'w') as f: + f.write('content') + os.chmod(full_path, 0o222) + with pytest.raises(IOError) as ei: + tar(base) + + assert 'Can not read file in context: {full_path}'.format(full_path=full_path) in ( + ei.exconly() + ) + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_with_file_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + with open(os.path.join(base, 'foo'), 'w') as f: + f.write("content") + os.makedirs(os.path.join(base, 'bar')) + os.symlink('../foo', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_with_directory_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + os.symlink('../foo', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_with_broken_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + + os.symlink('../baz', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No UNIX sockets on Win32') + def test_tar_socket_file(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + sock = socket.socket(socket.AF_UNIX) + self.addCleanup(sock.close) + sock.bind(os.path.join(base, 'test.sock')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'foo'] + + def tar_test_negative_mtime_bug(self): + base = tempfile.mkdtemp() + filename = os.path.join(base, 'th.txt') + self.addCleanup(shutil.rmtree, base) + with open(filename, 'w') as f: + f.write('Invisible Full Moon') + os.utime(filename, (12345, -3600.0)) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert tar_data.getnames() == ['th.txt'] + assert tar_data.getmember('th.txt').mtime == -3600 + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_directory_link(self): + dirs = ['a', 'b', 'a/c'] + files = ['a/hello.py', 'b/utils.py', 'a/c/descend.py'] + base = make_tree(dirs, files) + self.addCleanup(shutil.rmtree, base) + os.symlink(os.path.join(base, 'b'), os.path.join(base, 'a/c/b')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + names = tar_data.getnames() + for member in dirs + files: + assert member in names + assert 'a/c/b' in names + assert 'a/c/b/utils.py' not in names diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py new file mode 100644 index 00000000..9448f384 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import unittest +import shutil +import tempfile +import json +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from pytest import mark, fixture + +from ansible_collections.community.docker.plugins.module_utils._api.utils import config + +try: + from unittest import mock +except ImportError: + import mock + + +class FindConfigFileTest(unittest.TestCase): + + @fixture(autouse=True) + def tmpdir(self, tmpdir): + self.mkdir = tmpdir.mkdir + + def test_find_config_fallback(self): + tmpdir = self.mkdir('test_find_config_fallback') + + with mock.patch.dict(os.environ, {'HOME': str(tmpdir)}): + assert config.find_config_file() is None + + def test_find_config_from_explicit_path(self): + tmpdir = self.mkdir('test_find_config_from_explicit_path') + config_path = tmpdir.ensure('my-config-file.json') + + assert config.find_config_file(str(config_path)) == str(config_path) + + def test_find_config_from_environment(self): + tmpdir = self.mkdir('test_find_config_from_environment') + config_path = tmpdir.ensure('config.json') + + with mock.patch.dict(os.environ, {'DOCKER_CONFIG': str(tmpdir)}): + assert config.find_config_file() == str(config_path) + + @mark.skipif("sys.platform == 'win32'") + def test_find_config_from_home_posix(self): + tmpdir = self.mkdir('test_find_config_from_home_posix') + config_path = tmpdir.ensure('.docker', 'config.json') + + with mock.patch.dict(os.environ, {'HOME': str(tmpdir)}): + assert config.find_config_file() == str(config_path) + + @mark.skipif("sys.platform == 'win32'") + def test_find_config_from_home_legacy_name(self): + tmpdir = self.mkdir('test_find_config_from_home_legacy_name') + config_path = tmpdir.ensure('.dockercfg') + + with mock.patch.dict(os.environ, {'HOME': str(tmpdir)}): + assert config.find_config_file() == str(config_path) + + @mark.skipif("sys.platform != 'win32'") + def test_find_config_from_home_windows(self): + tmpdir = self.mkdir('test_find_config_from_home_windows') + config_path = tmpdir.ensure('.docker', 'config.json') + + with mock.patch.dict(os.environ, {'USERPROFILE': str(tmpdir)}): + assert config.find_config_file() == str(config_path) + + +class LoadConfigTest(unittest.TestCase): + def test_load_config_no_file(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg = config.load_general_config(folder) + assert cfg is not None + assert isinstance(cfg, dict) + assert not cfg + + def test_load_config_custom_headers(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, 'config.json') + config_data = { + 'HttpHeaders': { + 'Name': 'Spike', + 'Surname': 'Spiegel' + }, + } + + with open(dockercfg_path, 'w') as f: + json.dump(config_data, f) + + cfg = config.load_general_config(dockercfg_path) + assert 'HttpHeaders' in cfg + assert cfg['HttpHeaders'] == { + 'Name': 'Spike', + 'Surname': 'Spiegel' + } + + def test_load_config_detach_keys(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + dockercfg_path = os.path.join(folder, 'config.json') + config_data = { + 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i' + } + with open(dockercfg_path, 'w') as f: + json.dump(config_data, f) + + cfg = config.load_general_config(dockercfg_path) + assert cfg == config_data + + def test_load_config_from_env(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + dockercfg_path = os.path.join(folder, 'config.json') + config_data = { + 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i' + } + with open(dockercfg_path, 'w') as f: + json.dump(config_data, f) + + with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): + cfg = config.load_general_config(None) + assert cfg == config_data diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py new file mode 100644 index 00000000..8ba1ec5f --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.api.client import APIClient +from ansible_collections.community.docker.plugins.module_utils._api.constants import DEFAULT_DOCKER_API_VERSION +from ansible_collections.community.docker.plugins.module_utils._api.utils.decorators import update_headers + + +class DecoratorsTest(unittest.TestCase): + def test_update_headers(self): + sample_headers = { + 'X-Docker-Locale': 'en-US', + } + + def f(self, headers=None): + return headers + + client = APIClient(version=DEFAULT_DOCKER_API_VERSION) + client._general_configs = {} + + g = update_headers(f) + assert g(client, headers=None) is None + assert g(client, headers={}) == {} + assert g(client, headers={'Content-type': 'application/json'}) == { + 'Content-type': 'application/json', + } + + client._general_configs = { + 'HttpHeaders': sample_headers + } + + assert g(client, headers=None) == sample_headers + assert g(client, headers={}) == sample_headers + assert g(client, headers={'Content-type': 'application/json'}) == { + 'Content-type': 'application/json', + 'X-Docker-Locale': 'en-US', + } diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py new file mode 100644 index 00000000..2d7f300f --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.utils.json_stream import json_splitter, stream_as_text, json_stream + + +class TestJsonSplitter: + + def test_json_splitter_no_object(self): + data = '{"foo": "bar' + assert json_splitter(data) is None + + def test_json_splitter_with_object(self): + data = '{"foo": "bar"}\n \n{"next": "obj"}' + assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}') + + def test_json_splitter_leading_whitespace(self): + data = '\n \r{"foo": "bar"}\n\n {"next": "obj"}' + assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}') + + +class TestStreamAsText: + + def test_stream_with_non_utf_unicode_character(self): + stream = [b'\xed\xf3\xf3'] + output, = stream_as_text(stream) + assert output == u'���' + + def test_stream_with_utf_character(self): + stream = [u'ěĝ'.encode('utf-8')] + output, = stream_as_text(stream) + assert output == u'ěĝ' + + +class TestJsonStream: + + def test_with_falsy_entries(self): + stream = [ + '{"one": "two"}\n{}\n', + "[1, 2, 3]\n[]\n", + ] + output = list(json_stream(stream)) + assert output == [ + {'one': 'two'}, + {}, + [1, 2, 3], + [], + ] + + def test_with_leading_whitespace(self): + stream = [ + '\n \r\n {"one": "two"}{"x": 1}', + ' {"three": "four"}\t\t{"x": 2}' + ] + output = list(json_stream(stream)) + assert output == [ + {'one': 'two'}, + {'x': 1}, + {'three': 'four'}, + {'x': 2} + ] diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py new file mode 100644 index 00000000..c1a08a12 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.utils.ports import build_port_bindings, split_port + + +class PortsTest(unittest.TestCase): + def test_split_port_with_host_ip(self): + internal_port, external_port = split_port("127.0.0.1:1000:2000") + assert internal_port == ["2000"] + assert external_port == [("127.0.0.1", "1000")] + + def test_split_port_with_protocol(self): + for protocol in ['tcp', 'udp', 'sctp']: + internal_port, external_port = split_port( + "127.0.0.1:1000:2000/" + protocol + ) + assert internal_port == ["2000/" + protocol] + assert external_port == [("127.0.0.1", "1000")] + + def test_split_port_with_host_ip_no_port(self): + internal_port, external_port = split_port("127.0.0.1::2000") + assert internal_port == ["2000"] + assert external_port == [("127.0.0.1", None)] + + def test_split_port_range_with_host_ip_no_port(self): + internal_port, external_port = split_port("127.0.0.1::2000-2001") + assert internal_port == ["2000", "2001"] + assert external_port == [("127.0.0.1", None), ("127.0.0.1", None)] + + def test_split_port_with_host_port(self): + internal_port, external_port = split_port("1000:2000") + assert internal_port == ["2000"] + assert external_port == ["1000"] + + def test_split_port_range_with_host_port(self): + internal_port, external_port = split_port("1000-1001:2000-2001") + assert internal_port == ["2000", "2001"] + assert external_port == ["1000", "1001"] + + def test_split_port_random_port_range_with_host_port(self): + internal_port, external_port = split_port("1000-1001:2000") + assert internal_port == ["2000"] + assert external_port == ["1000-1001"] + + def test_split_port_no_host_port(self): + internal_port, external_port = split_port("2000") + assert internal_port == ["2000"] + assert external_port is None + + def test_split_port_range_no_host_port(self): + internal_port, external_port = split_port("2000-2001") + assert internal_port == ["2000", "2001"] + assert external_port is None + + def test_split_port_range_with_protocol(self): + internal_port, external_port = split_port( + "127.0.0.1:1000-1001:2000-2001/udp") + assert internal_port == ["2000/udp", "2001/udp"] + assert external_port == [("127.0.0.1", "1000"), ("127.0.0.1", "1001")] + + def test_split_port_with_ipv6_address(self): + internal_port, external_port = split_port( + "2001:abcd:ef00::2:1000:2000") + assert internal_port == ["2000"] + assert external_port == [("2001:abcd:ef00::2", "1000")] + + def test_split_port_with_ipv6_square_brackets_address(self): + internal_port, external_port = split_port( + "[2001:abcd:ef00::2]:1000:2000") + assert internal_port == ["2000"] + assert external_port == [("2001:abcd:ef00::2", "1000")] + + def test_split_port_invalid(self): + with pytest.raises(ValueError): + split_port("0.0.0.0:1000:2000:tcp") + + def test_split_port_invalid_protocol(self): + with pytest.raises(ValueError): + split_port("0.0.0.0:1000:2000/ftp") + + def test_non_matching_length_port_ranges(self): + with pytest.raises(ValueError): + split_port("0.0.0.0:1000-1010:2000-2002/tcp") + + def test_port_and_range_invalid(self): + with pytest.raises(ValueError): + split_port("0.0.0.0:1000:2000-2002/tcp") + + def test_port_only_with_colon(self): + with pytest.raises(ValueError): + split_port(":80") + + def test_host_only_with_colon(self): + with pytest.raises(ValueError): + split_port("localhost:") + + def test_with_no_container_port(self): + with pytest.raises(ValueError): + split_port("localhost:80:") + + def test_split_port_empty_string(self): + with pytest.raises(ValueError): + split_port("") + + def test_split_port_non_string(self): + assert split_port(1243) == (['1243'], None) + + def test_build_port_bindings_with_one_port(self): + port_bindings = build_port_bindings(["127.0.0.1:1000:1000"]) + assert port_bindings["1000"] == [("127.0.0.1", "1000")] + + def test_build_port_bindings_with_matching_internal_ports(self): + port_bindings = build_port_bindings( + ["127.0.0.1:1000:1000", "127.0.0.1:2000:1000"]) + assert port_bindings["1000"] == [ + ("127.0.0.1", "1000"), ("127.0.0.1", "2000") + ] + + def test_build_port_bindings_with_nonmatching_internal_ports(self): + port_bindings = build_port_bindings( + ["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"]) + assert port_bindings["1000"] == [("127.0.0.1", "1000")] + assert port_bindings["2000"] == [("127.0.0.1", "2000")] + + def test_build_port_bindings_with_port_range(self): + port_bindings = build_port_bindings(["127.0.0.1:1000-1001:1000-1001"]) + assert port_bindings["1000"] == [("127.0.0.1", "1000")] + assert port_bindings["1001"] == [("127.0.0.1", "1001")] + + def test_build_port_bindings_with_matching_internal_port_ranges(self): + port_bindings = build_port_bindings( + ["127.0.0.1:1000-1001:1000-1001", "127.0.0.1:2000-2001:1000-1001"]) + assert port_bindings["1000"] == [ + ("127.0.0.1", "1000"), ("127.0.0.1", "2000") + ] + assert port_bindings["1001"] == [ + ("127.0.0.1", "1001"), ("127.0.0.1", "2001") + ] + + def test_build_port_bindings_with_nonmatching_internal_port_ranges(self): + port_bindings = build_port_bindings( + ["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"]) + assert port_bindings["1000"] == [("127.0.0.1", "1000")] + assert port_bindings["2000"] == [("127.0.0.1", "2000")] diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py new file mode 100644 index 00000000..0eb24270 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import unittest +import sys + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.utils.proxy import ProxyConfig + + +HTTP = 'http://test:80' +HTTPS = 'https://test:443' +FTP = 'ftp://user:password@host:23' +NO_PROXY = 'localhost,.localdomain' +CONFIG = ProxyConfig(http=HTTP, https=HTTPS, ftp=FTP, no_proxy=NO_PROXY) +ENV = { + 'http_proxy': HTTP, + 'HTTP_PROXY': HTTP, + 'https_proxy': HTTPS, + 'HTTPS_PROXY': HTTPS, + 'ftp_proxy': FTP, + 'FTP_PROXY': FTP, + 'no_proxy': NO_PROXY, + 'NO_PROXY': NO_PROXY, +} + + +class ProxyConfigTest(unittest.TestCase): + + def test_from_dict(self): + config = ProxyConfig.from_dict({ + 'httpProxy': HTTP, + 'httpsProxy': HTTPS, + 'ftpProxy': FTP, + 'noProxy': NO_PROXY + }) + self.assertEqual(CONFIG.http, config.http) + self.assertEqual(CONFIG.https, config.https) + self.assertEqual(CONFIG.ftp, config.ftp) + self.assertEqual(CONFIG.no_proxy, config.no_proxy) + + def test_new(self): + config = ProxyConfig() + self.assertIsNone(config.http) + self.assertIsNone(config.https) + self.assertIsNone(config.ftp) + self.assertIsNone(config.no_proxy) + + config = ProxyConfig(http='a', https='b', ftp='c', no_proxy='d') + self.assertEqual(config.http, 'a') + self.assertEqual(config.https, 'b') + self.assertEqual(config.ftp, 'c') + self.assertEqual(config.no_proxy, 'd') + + def test_truthiness(self): + assert not ProxyConfig() + assert ProxyConfig(http='non-zero') + assert ProxyConfig(https='non-zero') + assert ProxyConfig(ftp='non-zero') + assert ProxyConfig(no_proxy='non-zero') + + def test_environment(self): + self.assertDictEqual(CONFIG.get_environment(), ENV) + empty = ProxyConfig() + self.assertDictEqual(empty.get_environment(), {}) + + def test_inject_proxy_environment(self): + # Proxy config is non null, env is None. + self.assertSetEqual( + set(CONFIG.inject_proxy_environment(None)), + set('{k}={v}'.format(k=k, v=v) for k, v in ENV.items())) + + # Proxy config is null, env is None. + self.assertIsNone(ProxyConfig().inject_proxy_environment(None), None) + + env = ['FOO=BAR', 'BAR=BAZ'] + + # Proxy config is non null, env is non null + actual = CONFIG.inject_proxy_environment(env) + expected = ['{k}={v}'.format(k=k, v=v) for k, v in ENV.items()] + env + # It's important that the first 8 variables are the ones from the proxy + # config, and the last 2 are the ones from the input environment + self.assertSetEqual(set(actual[:8]), set(expected[:8])) + self.assertSetEqual(set(actual[-2:]), set(expected[-2:])) + + # Proxy is null, and is non null + self.assertListEqual(ProxyConfig().inject_proxy_environment(env), env) diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py new file mode 100644 index 00000000..cc0dc695 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py @@ -0,0 +1,488 @@ +# -*- coding: utf-8 -*- +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import base64 +import json +import os +import os.path +import shutil +import tempfile +import unittest +import sys + +from ansible.module_utils.six import PY3 + +import pytest + +if sys.version_info < (2, 7): + pytestmark = pytest.mark.skip('Python 2.6 is not supported') + +from ansible_collections.community.docker.plugins.module_utils._api.api.client import APIClient +from ansible_collections.community.docker.plugins.module_utils._api.constants import IS_WINDOWS_PLATFORM, DEFAULT_DOCKER_API_VERSION +from ansible_collections.community.docker.plugins.module_utils._api.errors import DockerException +from ansible_collections.community.docker.plugins.module_utils._api.utils.utils import ( + convert_filters, convert_volume_binds, + decode_json_header, kwargs_from_env, parse_bytes, + parse_devices, parse_env_file, parse_host, + parse_repository_tag, split_command, format_environment, +) + + +TEST_CERT_DIR = os.path.join( + os.path.dirname(__file__), + 'testdata/certs', +) + + +class KwargsFromEnvTest(unittest.TestCase): + def setUp(self): + self.os_environ = os.environ.copy() + + def tearDown(self): + os.environ = self.os_environ + + def test_kwargs_from_env_empty(self): + os.environ.update(DOCKER_HOST='', + DOCKER_CERT_PATH='') + os.environ.pop('DOCKER_TLS_VERIFY', None) + + kwargs = kwargs_from_env() + assert kwargs.get('base_url') is None + assert kwargs.get('tls') is None + + def test_kwargs_from_env_tls(self): + os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376', + DOCKER_CERT_PATH=TEST_CERT_DIR, + DOCKER_TLS_VERIFY='1') + kwargs = kwargs_from_env(assert_hostname=False) + assert 'tcp://192.168.59.103:2376' == kwargs['base_url'] + assert 'ca.pem' in kwargs['tls'].ca_cert + assert 'cert.pem' in kwargs['tls'].cert[0] + assert 'key.pem' in kwargs['tls'].cert[1] + assert kwargs['tls'].assert_hostname is False + assert kwargs['tls'].verify + + parsed_host = parse_host(kwargs['base_url'], IS_WINDOWS_PLATFORM, True) + kwargs['version'] = DEFAULT_DOCKER_API_VERSION + try: + client = APIClient(**kwargs) + assert parsed_host == client.base_url + assert kwargs['tls'].ca_cert == client.verify + assert kwargs['tls'].cert == client.cert + except TypeError as e: + self.fail(e) + + def test_kwargs_from_env_tls_verify_false(self): + os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376', + DOCKER_CERT_PATH=TEST_CERT_DIR, + DOCKER_TLS_VERIFY='') + kwargs = kwargs_from_env(assert_hostname=True) + assert 'tcp://192.168.59.103:2376' == kwargs['base_url'] + assert 'ca.pem' in kwargs['tls'].ca_cert + assert 'cert.pem' in kwargs['tls'].cert[0] + assert 'key.pem' in kwargs['tls'].cert[1] + assert kwargs['tls'].assert_hostname is True + assert kwargs['tls'].verify is False + parsed_host = parse_host(kwargs['base_url'], IS_WINDOWS_PLATFORM, True) + kwargs['version'] = DEFAULT_DOCKER_API_VERSION + try: + client = APIClient(**kwargs) + assert parsed_host == client.base_url + assert kwargs['tls'].cert == client.cert + assert not kwargs['tls'].verify + except TypeError as e: + self.fail(e) + + def test_kwargs_from_env_tls_verify_false_no_cert(self): + temp_dir = tempfile.mkdtemp() + cert_dir = os.path.join(temp_dir, '.docker') + shutil.copytree(TEST_CERT_DIR, cert_dir) + + os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376', + HOME=temp_dir, + DOCKER_TLS_VERIFY='') + os.environ.pop('DOCKER_CERT_PATH', None) + kwargs = kwargs_from_env(assert_hostname=True) + assert 'tcp://192.168.59.103:2376' == kwargs['base_url'] + + def test_kwargs_from_env_no_cert_path(self): + try: + temp_dir = tempfile.mkdtemp() + cert_dir = os.path.join(temp_dir, '.docker') + shutil.copytree(TEST_CERT_DIR, cert_dir) + + os.environ.update(HOME=temp_dir, + DOCKER_CERT_PATH='', + DOCKER_TLS_VERIFY='1') + + kwargs = kwargs_from_env() + assert kwargs['tls'].verify + assert cert_dir in kwargs['tls'].ca_cert + assert cert_dir in kwargs['tls'].cert[0] + assert cert_dir in kwargs['tls'].cert[1] + finally: + if temp_dir: + shutil.rmtree(temp_dir) + + def test_kwargs_from_env_alternate_env(self): + # Values in os.environ are entirely ignored if an alternate is + # provided + os.environ.update( + DOCKER_HOST='tcp://192.168.59.103:2376', + DOCKER_CERT_PATH=TEST_CERT_DIR, + DOCKER_TLS_VERIFY='' + ) + kwargs = kwargs_from_env(environment={ + 'DOCKER_HOST': 'http://docker.gensokyo.jp:2581', + }) + assert 'http://docker.gensokyo.jp:2581' == kwargs['base_url'] + assert 'tls' not in kwargs + + +class ConverVolumeBindsTest(unittest.TestCase): + def test_convert_volume_binds_empty(self): + assert convert_volume_binds({}) == [] + assert convert_volume_binds([]) == [] + + def test_convert_volume_binds_list(self): + data = ['/a:/a:ro', '/b:/c:z'] + assert convert_volume_binds(data) == data + + def test_convert_volume_binds_complete(self): + data = { + '/mnt/vol1': { + 'bind': '/data', + 'mode': 'ro' + } + } + assert convert_volume_binds(data) == ['/mnt/vol1:/data:ro'] + + def test_convert_volume_binds_compact(self): + data = { + '/mnt/vol1': '/data' + } + assert convert_volume_binds(data) == ['/mnt/vol1:/data:rw'] + + def test_convert_volume_binds_no_mode(self): + data = { + '/mnt/vol1': { + 'bind': '/data' + } + } + assert convert_volume_binds(data) == ['/mnt/vol1:/data:rw'] + + def test_convert_volume_binds_unicode_bytes_input(self): + expected = [u'/mnt/지연:/unicode/박:rw'] + + data = { + u'/mnt/지연'.encode('utf-8'): { + 'bind': u'/unicode/박'.encode('utf-8'), + 'mode': u'rw' + } + } + assert convert_volume_binds(data) == expected + + def test_convert_volume_binds_unicode_unicode_input(self): + expected = [u'/mnt/지연:/unicode/박:rw'] + + data = { + u'/mnt/지연': { + 'bind': u'/unicode/박', + 'mode': u'rw' + } + } + assert convert_volume_binds(data) == expected + + +class ParseEnvFileTest(unittest.TestCase): + def generate_tempfile(self, file_content=None): + """ + Generates a temporary file for tests with the content + of 'file_content' and returns the filename. + Don't forget to unlink the file with os.unlink() after. + """ + local_tempfile = tempfile.NamedTemporaryFile(delete=False) + local_tempfile.write(file_content.encode('UTF-8')) + local_tempfile.close() + return local_tempfile.name + + def test_parse_env_file_proper(self): + env_file = self.generate_tempfile( + file_content='USER=jdoe\nPASS=secret') + get_parse_env_file = parse_env_file(env_file) + assert get_parse_env_file == {'USER': 'jdoe', 'PASS': 'secret'} + os.unlink(env_file) + + def test_parse_env_file_with_equals_character(self): + env_file = self.generate_tempfile( + file_content='USER=jdoe\nPASS=sec==ret') + get_parse_env_file = parse_env_file(env_file) + assert get_parse_env_file == {'USER': 'jdoe', 'PASS': 'sec==ret'} + os.unlink(env_file) + + def test_parse_env_file_commented_line(self): + env_file = self.generate_tempfile( + file_content='USER=jdoe\n#PASS=secret') + get_parse_env_file = parse_env_file(env_file) + assert get_parse_env_file == {'USER': 'jdoe'} + os.unlink(env_file) + + def test_parse_env_file_newline(self): + env_file = self.generate_tempfile( + file_content='\nUSER=jdoe\n\n\nPASS=secret') + get_parse_env_file = parse_env_file(env_file) + assert get_parse_env_file == {'USER': 'jdoe', 'PASS': 'secret'} + os.unlink(env_file) + + def test_parse_env_file_invalid_line(self): + env_file = self.generate_tempfile( + file_content='USER jdoe') + with pytest.raises(DockerException): + parse_env_file(env_file) + os.unlink(env_file) + + +class ParseHostTest(unittest.TestCase): + def test_parse_host(self): + invalid_hosts = [ + '0.0.0.0', + 'tcp://', + 'udp://127.0.0.1', + 'udp://127.0.0.1:2375', + 'ssh://:22/path', + 'tcp://netloc:3333/path?q=1', + 'unix:///sock/path#fragment', + 'https://netloc:3333/path;params', + 'ssh://:clearpassword@host:22', + ] + + valid_hosts = { + '0.0.0.1:5555': 'http://0.0.0.1:5555', + ':6666': 'http://127.0.0.1:6666', + 'tcp://:7777': 'http://127.0.0.1:7777', + 'http://:7777': 'http://127.0.0.1:7777', + 'https://kokia.jp:2375': 'https://kokia.jp:2375', + 'unix:///var/run/docker.sock': 'http+unix:///var/run/docker.sock', + 'unix://': 'http+unix:///var/run/docker.sock', + '12.234.45.127:2375/docker/engine': ( + 'http://12.234.45.127:2375/docker/engine' + ), + 'somehost.net:80/service/swarm': ( + 'http://somehost.net:80/service/swarm' + ), + 'npipe:////./pipe/docker_engine': 'npipe:////./pipe/docker_engine', + '[fd12::82d1]:2375': 'http://[fd12::82d1]:2375', + 'https://[fd12:5672::12aa]:1090': 'https://[fd12:5672::12aa]:1090', + '[fd12::82d1]:2375/docker/engine': ( + 'http://[fd12::82d1]:2375/docker/engine' + ), + 'ssh://[fd12::82d1]': 'ssh://[fd12::82d1]:22', + 'ssh://user@[fd12::82d1]:8765': 'ssh://user@[fd12::82d1]:8765', + 'ssh://': 'ssh://127.0.0.1:22', + 'ssh://user@localhost:22': 'ssh://user@localhost:22', + 'ssh://user@remote': 'ssh://user@remote:22', + } + + for host in invalid_hosts: + msg = 'Should have failed to parse invalid host: {0}'.format(host) + with self.assertRaises(DockerException, msg=msg): + parse_host(host, None) + + for host, expected in valid_hosts.items(): + self.assertEqual( + parse_host(host, None), + expected, + msg='Failed to parse valid host: {0}'.format(host), + ) + + def test_parse_host_empty_value(self): + unix_socket = 'http+unix:///var/run/docker.sock' + npipe = 'npipe:////./pipe/docker_engine' + + for val in [None, '']: + assert parse_host(val, is_win32=False) == unix_socket + assert parse_host(val, is_win32=True) == npipe + + def test_parse_host_tls(self): + host_value = 'myhost.docker.net:3348' + expected_result = 'https://myhost.docker.net:3348' + assert parse_host(host_value, tls=True) == expected_result + + def test_parse_host_tls_tcp_proto(self): + host_value = 'tcp://myhost.docker.net:3348' + expected_result = 'https://myhost.docker.net:3348' + assert parse_host(host_value, tls=True) == expected_result + + def test_parse_host_trailing_slash(self): + host_value = 'tcp://myhost.docker.net:2376/' + expected_result = 'http://myhost.docker.net:2376' + assert parse_host(host_value) == expected_result + + +class ParseRepositoryTagTest(unittest.TestCase): + sha = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' + + def test_index_image_no_tag(self): + assert parse_repository_tag("root") == ("root", None) + + def test_index_image_tag(self): + assert parse_repository_tag("root:tag") == ("root", "tag") + + def test_index_user_image_no_tag(self): + assert parse_repository_tag("user/repo") == ("user/repo", None) + + def test_index_user_image_tag(self): + assert parse_repository_tag("user/repo:tag") == ("user/repo", "tag") + + def test_private_reg_image_no_tag(self): + assert parse_repository_tag("url:5000/repo") == ("url:5000/repo", None) + + def test_private_reg_image_tag(self): + assert parse_repository_tag("url:5000/repo:tag") == ( + "url:5000/repo", "tag" + ) + + def test_index_image_sha(self): + assert parse_repository_tag("root@sha256:{sha}".format(sha=self.sha)) == ( + "root", "sha256:{sha}".format(sha=self.sha) + ) + + def test_private_reg_image_sha(self): + assert parse_repository_tag( + "url:5000/repo@sha256:{sha}".format(sha=self.sha) + ) == ("url:5000/repo", "sha256:{sha}".format(sha=self.sha)) + + +class ParseDeviceTest(unittest.TestCase): + def test_dict(self): + devices = parse_devices([{ + 'PathOnHost': '/dev/sda1', + 'PathInContainer': '/dev/mnt1', + 'CgroupPermissions': 'r' + }]) + assert devices[0] == { + 'PathOnHost': '/dev/sda1', + 'PathInContainer': '/dev/mnt1', + 'CgroupPermissions': 'r' + } + + def test_partial_string_definition(self): + devices = parse_devices(['/dev/sda1']) + assert devices[0] == { + 'PathOnHost': '/dev/sda1', + 'PathInContainer': '/dev/sda1', + 'CgroupPermissions': 'rwm' + } + + def test_permissionless_string_definition(self): + devices = parse_devices(['/dev/sda1:/dev/mnt1']) + assert devices[0] == { + 'PathOnHost': '/dev/sda1', + 'PathInContainer': '/dev/mnt1', + 'CgroupPermissions': 'rwm' + } + + def test_full_string_definition(self): + devices = parse_devices(['/dev/sda1:/dev/mnt1:r']) + assert devices[0] == { + 'PathOnHost': '/dev/sda1', + 'PathInContainer': '/dev/mnt1', + 'CgroupPermissions': 'r' + } + + def test_hybrid_list(self): + devices = parse_devices([ + '/dev/sda1:/dev/mnt1:rw', + { + 'PathOnHost': '/dev/sda2', + 'PathInContainer': '/dev/mnt2', + 'CgroupPermissions': 'r' + } + ]) + + assert devices[0] == { + 'PathOnHost': '/dev/sda1', + 'PathInContainer': '/dev/mnt1', + 'CgroupPermissions': 'rw' + } + assert devices[1] == { + 'PathOnHost': '/dev/sda2', + 'PathInContainer': '/dev/mnt2', + 'CgroupPermissions': 'r' + } + + +class ParseBytesTest(unittest.TestCase): + def test_parse_bytes_valid(self): + assert parse_bytes("512MB") == 536870912 + assert parse_bytes("512M") == 536870912 + assert parse_bytes("512m") == 536870912 + + def test_parse_bytes_invalid(self): + with pytest.raises(DockerException): + parse_bytes("512MK") + with pytest.raises(DockerException): + parse_bytes("512L") + with pytest.raises(DockerException): + parse_bytes("127.0.0.1K") + + def test_parse_bytes_float(self): + assert parse_bytes("1.5k") == 1536 + + +class UtilsTest(unittest.TestCase): + longMessage = True + + def test_convert_filters(self): + tests = [ + ({'dangling': True}, '{"dangling": ["true"]}'), + ({'dangling': "true"}, '{"dangling": ["true"]}'), + ({'exited': 0}, '{"exited": ["0"]}'), + ({'exited': [0, 1]}, '{"exited": ["0", "1"]}'), + ] + + for filters, expected in tests: + assert convert_filters(filters) == expected + + def test_decode_json_header(self): + obj = {'a': 'b', 'c': 1} + data = None + if PY3: + data = base64.urlsafe_b64encode(bytes(json.dumps(obj), 'utf-8')) + else: + data = base64.urlsafe_b64encode(json.dumps(obj)) + decoded_data = decode_json_header(data) + assert obj == decoded_data + + +class SplitCommandTest(unittest.TestCase): + def test_split_command_with_unicode(self): + assert split_command(u'echo μμ') == ['echo', 'μμ'] + + @pytest.mark.skipif(PY3, reason="shlex doesn't support bytes in py3") + def test_split_command_with_bytes(self): + assert split_command('echo μμ') == ['echo', 'μμ'] + + +class FormatEnvironmentTest(unittest.TestCase): + def test_format_env_binary_unicode_value(self): + env_dict = { + 'ARTIST_NAME': b'\xec\x86\xa1\xec\xa7\x80\xec\x9d\x80' + } + assert format_environment(env_dict) == [u'ARTIST_NAME=송지은'] + + def test_format_env_no_value(self): + env_dict = { + 'FOO': None, + 'BAR': '', + } + assert sorted(format_environment(env_dict)) == ['BAR=', 'FOO'] diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem new file mode 100644 index 00000000..5c7093a3 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem @@ -0,0 +1,7 @@ +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem new file mode 100644 index 00000000..5c7093a3 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem @@ -0,0 +1,7 @@ +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem new file mode 100644 index 00000000..5c7093a3 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem @@ -0,0 +1,7 @@ +# This code is part of the Ansible collection community.docker, but is an independent component. +# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/) +# +# Copyright (c) 2016-2022 Docker, Inc. +# +# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection) +# SPDX-License-Identifier: Apache-2.0 diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py new file mode 100644 index 00000000..ff004306 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py @@ -0,0 +1,28 @@ +# Copyright 2022 Red Hat | Ansible +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible_collections.community.docker.plugins.module_utils._scramble import ( + scramble, + unscramble, +) + + +@pytest.mark.parametrize('plaintext, key, scrambled', [ + (u'', b'0', '=S='), + (u'hello', b'\x00', '=S=aGVsbG8='), + (u'hello', b'\x01', '=S=aWRtbW4='), +]) +def test_scramble_unscramble(plaintext, key, scrambled): + scrambled_ = scramble(plaintext, key) + print('{0!r} == {1!r}'.format(scrambled_, scrambled)) + assert scrambled_ == scrambled + + plaintext_ = unscramble(scrambled, key) + print('{0!r} == {1!r}'.format(plaintext_, plaintext)) + assert plaintext_ == plaintext diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py new file mode 100644 index 00000000..3668573b --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py @@ -0,0 +1,77 @@ +# Copyright 2022 Red Hat | Ansible +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible_collections.community.docker.plugins.module_utils.copy import ( + _stream_generator_to_fileobj, +) + + +def _simple_generator(sequence): + for elt in sequence: + yield elt + + +@pytest.mark.parametrize('chunks, read_sizes', [ + ( + [ + (1, b'1'), + (1, b'2'), + (1, b'3'), + (1, b'4'), + ], + [ + 1, + 2, + 3, + ] + ), + ( + [ + (1, b'123'), + (1, b'456'), + (1, b'789'), + ], + [ + 1, + 4, + 2, + 2, + 2, + ] + ), + ( + [ + (10 * 1024 * 1024, b'0'), + (10 * 1024 * 1024, b'1'), + ], + [ + 1024 * 1024 - 5, + 5 * 1024 * 1024 - 3, + 10 * 1024 * 1024 - 2, + 2 * 1024 * 1024 - 1, + 2 * 1024 * 1024 + 5 + 3 + 2 + 1, + ] + ), +]) +def test__stream_generator_to_fileobj(chunks, read_sizes): + chunks = [count * data for count, data in chunks] + stream = _simple_generator(chunks) + expected = b''.join(chunks) + + buffer = b'' + totally_read = 0 + f = _stream_generator_to_fileobj(stream) + for read_size in read_sizes: + chunk = f.read(read_size) + assert len(chunk) == min(read_size, len(expected) - len(buffer)) + buffer += chunk + totally_read += read_size + + assert buffer == expected[:len(buffer)] + assert min(totally_read, len(expected)) == len(buffer) diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py new file mode 100644 index 00000000..10573b96 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py @@ -0,0 +1,94 @@ +# Copyright 2022 Red Hat | Ansible +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest +import tarfile + +from ansible_collections.community.docker.plugins.module_utils.image_archive import ( + api_image_id, + archived_image_manifest, + ImageArchiveInvalidException +) + +from ..test_support.docker_image_archive_stubbing import ( + write_imitation_archive, + write_imitation_archive_with_manifest, + write_irrelevant_tar, +) + + +@pytest.fixture +def tar_file_name(tmpdir): + ''' + Return the name of a non-existing tar file in an existing temporary directory. + ''' + + # Cast to str required by Python 2.x + return str(tmpdir.join('foo.tar')) + + +@pytest.mark.parametrize('expected, value', [ + ('sha256:foo', 'foo'), + ('sha256:bar', 'bar') +]) +def test_api_image_id_from_archive_id(expected, value): + assert api_image_id(value) == expected + + +def test_archived_image_manifest_extracts(tar_file_name): + expected_id = "abcde12345" + expected_tags = ["foo:latest", "bar:v1"] + + write_imitation_archive(tar_file_name, expected_id, expected_tags) + + actual = archived_image_manifest(tar_file_name) + + assert actual.image_id == expected_id + assert actual.repo_tags == expected_tags + + +def test_archived_image_manifest_extracts_nothing_when_file_not_present(tar_file_name): + image_id = archived_image_manifest(tar_file_name) + + assert image_id is None + + +def test_archived_image_manifest_raises_when_file_not_a_tar(): + try: + archived_image_manifest(__file__) + raise AssertionError() + except ImageArchiveInvalidException as e: + assert isinstance(e.cause, tarfile.ReadError) + assert str(__file__) in str(e) + + +def test_archived_image_manifest_raises_when_tar_missing_manifest(tar_file_name): + write_irrelevant_tar(tar_file_name) + + try: + archived_image_manifest(tar_file_name) + raise AssertionError() + except ImageArchiveInvalidException as e: + assert isinstance(e.cause, KeyError) + assert 'manifest.json' in str(e.cause) + + +def test_archived_image_manifest_raises_when_manifest_missing_id(tar_file_name): + manifest = [ + { + 'foo': 'bar' + } + ] + + write_imitation_archive_with_manifest(tar_file_name, manifest) + + try: + archived_image_manifest(tar_file_name) + raise AssertionError() + except ImageArchiveInvalidException as e: + assert isinstance(e.cause, KeyError) + assert 'Config' in str(e.cause) diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py new file mode 100644 index 00000000..c7d36212 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py @@ -0,0 +1,522 @@ +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible_collections.community.docker.plugins.module_utils.util import ( + compare_dict_allow_more_present, + compare_generic, + convert_duration_to_nanosecond, + parse_healthcheck +) + +DICT_ALLOW_MORE_PRESENT = ( + { + 'av': {}, + 'bv': {'a': 1}, + 'result': True + }, + { + 'av': {'a': 1}, + 'bv': {'a': 1, 'b': 2}, + 'result': True + }, + { + 'av': {'a': 1}, + 'bv': {'b': 2}, + 'result': False + }, + { + 'av': {'a': 1}, + 'bv': {'a': None, 'b': 1}, + 'result': False + }, + { + 'av': {'a': None}, + 'bv': {'b': 1}, + 'result': False + }, +) + +COMPARE_GENERIC = [ + ######################################################################################## + # value + { + 'a': 1, + 'b': 2, + 'method': 'strict', + 'type': 'value', + 'result': False + }, + { + 'a': 'hello', + 'b': 'hello', + 'method': 'strict', + 'type': 'value', + 'result': True + }, + { + 'a': None, + 'b': 'hello', + 'method': 'strict', + 'type': 'value', + 'result': False + }, + { + 'a': None, + 'b': None, + 'method': 'strict', + 'type': 'value', + 'result': True + }, + { + 'a': 1, + 'b': 2, + 'method': 'ignore', + 'type': 'value', + 'result': True + }, + { + 'a': None, + 'b': 2, + 'method': 'ignore', + 'type': 'value', + 'result': True + }, + ######################################################################################## + # list + { + 'a': [ + 'x', + ], + 'b': [ + 'y', + ], + 'method': 'strict', + 'type': 'list', + 'result': False + }, + { + 'a': [ + 'x', + ], + 'b': [ + 'x', + 'x', + ], + 'method': 'strict', + 'type': 'list', + 'result': False + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'x', + 'y', + ], + 'method': 'strict', + 'type': 'list', + 'result': True + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'y', + 'x', + ], + 'method': 'strict', + 'type': 'list', + 'result': False + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'x', + ], + 'method': 'allow_more_present', + 'type': 'list', + 'result': False + }, + { + 'a': [ + 'x', + ], + 'b': [ + 'x', + 'y', + ], + 'method': 'allow_more_present', + 'type': 'list', + 'result': True + }, + { + 'a': [ + 'x', + 'x', + 'y', + ], + 'b': [ + 'x', + 'y', + ], + 'method': 'allow_more_present', + 'type': 'list', + 'result': False + }, + { + 'a': [ + 'x', + 'z', + ], + 'b': [ + 'x', + 'y', + 'x', + 'z', + ], + 'method': 'allow_more_present', + 'type': 'list', + 'result': True + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'y', + 'x', + ], + 'method': 'ignore', + 'type': 'list', + 'result': True + }, + ######################################################################################## + # set + { + 'a': [ + 'x', + ], + 'b': [ + 'y', + ], + 'method': 'strict', + 'type': 'set', + 'result': False + }, + { + 'a': [ + 'x', + ], + 'b': [ + 'x', + 'x', + ], + 'method': 'strict', + 'type': 'set', + 'result': True + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'x', + 'y', + ], + 'method': 'strict', + 'type': 'set', + 'result': True + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'y', + 'x', + ], + 'method': 'strict', + 'type': 'set', + 'result': True + }, + { + 'a': [ + 'x', + 'y', + ], + 'b': [ + 'x', + ], + 'method': 'allow_more_present', + 'type': 'set', + 'result': False + }, + { + 'a': [ + 'x', + ], + 'b': [ + 'x', + 'y', + ], + 'method': 'allow_more_present', + 'type': 'set', + 'result': True + }, + { + 'a': [ + 'x', + 'x', + 'y', + ], + 'b': [ + 'x', + 'y', + ], + 'method': 'allow_more_present', + 'type': 'set', + 'result': True + }, + { + 'a': [ + 'x', + 'z', + ], + 'b': [ + 'x', + 'y', + 'x', + 'z', + ], + 'method': 'allow_more_present', + 'type': 'set', + 'result': True + }, + { + 'a': [ + 'x', + 'a', + ], + 'b': [ + 'y', + 'z', + ], + 'method': 'ignore', + 'type': 'set', + 'result': True + }, + ######################################################################################## + # set(dict) + { + 'a': [ + {'x': 1}, + ], + 'b': [ + {'y': 1}, + ], + 'method': 'strict', + 'type': 'set(dict)', + 'result': False + }, + { + 'a': [ + {'x': 1}, + ], + 'b': [ + {'x': 1}, + ], + 'method': 'strict', + 'type': 'set(dict)', + 'result': True + }, + { + 'a': [ + {'x': 1}, + ], + 'b': [ + {'x': 1, 'y': 2}, + ], + 'method': 'strict', + 'type': 'set(dict)', + 'result': True + }, + { + 'a': [ + {'x': 1}, + {'x': 2, 'y': 3}, + ], + 'b': [ + {'x': 1}, + {'x': 2, 'y': 3}, + ], + 'method': 'strict', + 'type': 'set(dict)', + 'result': True + }, + { + 'a': [ + {'x': 1}, + ], + 'b': [ + {'x': 1, 'z': 2}, + {'x': 2, 'y': 3}, + ], + 'method': 'allow_more_present', + 'type': 'set(dict)', + 'result': True + }, + { + 'a': [ + {'x': 1, 'y': 2}, + ], + 'b': [ + {'x': 1}, + {'x': 2, 'y': 3}, + ], + 'method': 'allow_more_present', + 'type': 'set(dict)', + 'result': False + }, + { + 'a': [ + {'x': 1, 'y': 3}, + ], + 'b': [ + {'x': 1}, + {'x': 1, 'y': 3, 'z': 4}, + ], + 'method': 'allow_more_present', + 'type': 'set(dict)', + 'result': True + }, + { + 'a': [ + {'x': 1}, + {'x': 2, 'y': 3}, + ], + 'b': [ + {'x': 1}, + ], + 'method': 'ignore', + 'type': 'set(dict)', + 'result': True + }, + ######################################################################################## + # dict + { + 'a': {'x': 1}, + 'b': {'y': 1}, + 'method': 'strict', + 'type': 'dict', + 'result': False + }, + { + 'a': {'x': 1}, + 'b': {'x': 1, 'y': 2}, + 'method': 'strict', + 'type': 'dict', + 'result': False + }, + { + 'a': {'x': 1}, + 'b': {'x': 1}, + 'method': 'strict', + 'type': 'dict', + 'result': True + }, + { + 'a': {'x': 1, 'z': 2}, + 'b': {'x': 1, 'y': 2}, + 'method': 'strict', + 'type': 'dict', + 'result': False + }, + { + 'a': {'x': 1, 'z': 2}, + 'b': {'x': 1, 'y': 2}, + 'method': 'ignore', + 'type': 'dict', + 'result': True + }, +] + [{ + 'a': entry['av'], + 'b': entry['bv'], + 'method': 'allow_more_present', + 'type': 'dict', + 'result': entry['result'] +} for entry in DICT_ALLOW_MORE_PRESENT] + + +@pytest.mark.parametrize("entry", DICT_ALLOW_MORE_PRESENT) +def test_dict_allow_more_present(entry): + assert compare_dict_allow_more_present(entry['av'], entry['bv']) == entry['result'] + + +@pytest.mark.parametrize("entry", COMPARE_GENERIC) +def test_compare_generic(entry): + assert compare_generic(entry['a'], entry['b'], entry['method'], entry['type']) == entry['result'] + + +def test_convert_duration_to_nanosecond(): + nanoseconds = convert_duration_to_nanosecond('5s') + assert nanoseconds == 5000000000 + nanoseconds = convert_duration_to_nanosecond('1m5s') + assert nanoseconds == 65000000000 + with pytest.raises(ValueError): + convert_duration_to_nanosecond([1, 2, 3]) + with pytest.raises(ValueError): + convert_duration_to_nanosecond('10x') + + +def test_parse_healthcheck(): + result, disabled = parse_healthcheck({ + 'test': 'sleep 1', + 'interval': '1s', + }) + assert disabled is False + assert result == { + 'test': ['CMD-SHELL', 'sleep 1'], + 'interval': 1000000000 + } + + result, disabled = parse_healthcheck({ + 'test': ['NONE'], + }) + assert result is None + assert disabled + + result, disabled = parse_healthcheck({ + 'test': 'sleep 1', + 'interval': '1s423ms' + }) + assert result == { + 'test': ['CMD-SHELL', 'sleep 1'], + 'interval': 1423000000 + } + assert disabled is False + + result, disabled = parse_healthcheck({ + 'test': 'sleep 1', + 'interval': '1h1m2s3ms4us' + }) + assert result == { + 'test': ['CMD-SHELL', 'sleep 1'], + 'interval': 3662003004000 + } + assert disabled is False diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py b/ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py new file mode 100644 index 00000000..0ed3dd44 --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py @@ -0,0 +1,32 @@ +# Copyright (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json + +import pytest + +from ansible.module_utils.six import string_types +from ansible.module_utils.common.text.converters import to_bytes +from ansible.module_utils.common._collections_compat import MutableMapping + + +@pytest.fixture +def patch_ansible_module(request, mocker): + if isinstance(request.param, string_types): + args = request.param + elif isinstance(request.param, MutableMapping): + if 'ANSIBLE_MODULE_ARGS' not in request.param: + request.param = {'ANSIBLE_MODULE_ARGS': request.param} + if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']: + request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp' + if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']: + request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False + args = json.dumps(request.param) + else: + raise Exception('Malformed data to the patch_ansible_module pytest fixture') + + mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args)) diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py new file mode 100644 index 00000000..3401837f --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py @@ -0,0 +1,114 @@ +# Copyright 2022 Red Hat | Ansible +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible_collections.community.docker.plugins.modules.docker_image import ImageManager + +from ansible_collections.community.docker.plugins.module_utils.image_archive import api_image_id + +from ..test_support.docker_image_archive_stubbing import ( + write_imitation_archive, + write_irrelevant_tar, +) + + +def assert_no_logging(msg): + raise AssertionError('Should not have logged anything but logged %s' % msg) + + +def capture_logging(messages): + def capture(msg): + messages.append(msg) + + return capture + + +@pytest.fixture +def tar_file_name(tmpdir): + """ + Return the name of a non-existing tar file in an existing temporary directory. + """ + + # Cast to str required by Python 2.x + return str(tmpdir.join('foo.tar')) + + +def test_archived_image_action_when_missing(tar_file_name): + fake_name = 'a:latest' + fake_id = 'a1' + + expected = 'Archived image %s to %s, since none present' % (fake_name, tar_file_name) + + actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, fake_name, api_image_id(fake_id)) + + assert actual == expected + + +def test_archived_image_action_when_current(tar_file_name): + fake_name = 'b:latest' + fake_id = 'b2' + + write_imitation_archive(tar_file_name, fake_id, [fake_name]) + + actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, fake_name, api_image_id(fake_id)) + + assert actual is None + + +def test_archived_image_action_when_invalid(tar_file_name): + fake_name = 'c:1.2.3' + fake_id = 'c3' + + write_irrelevant_tar(tar_file_name) + + expected = 'Archived image %s to %s, overwriting an unreadable archive file' % (fake_name, tar_file_name) + + actual_log = [] + actual = ImageManager.archived_image_action( + capture_logging(actual_log), + tar_file_name, + fake_name, + api_image_id(fake_id) + ) + + assert actual == expected + + assert len(actual_log) == 1 + assert actual_log[0].startswith('Unable to extract manifest summary from archive') + + +def test_archived_image_action_when_obsolete_by_id(tar_file_name): + fake_name = 'd:0.0.1' + old_id = 'e5' + new_id = 'd4' + + write_imitation_archive(tar_file_name, old_id, [fake_name]) + + expected = 'Archived image %s to %s, overwriting archive with image %s named %s' % ( + fake_name, tar_file_name, old_id, fake_name + ) + actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, fake_name, api_image_id(new_id)) + + assert actual == expected + + +def test_archived_image_action_when_obsolete_by_name(tar_file_name): + old_name = 'hi' + new_name = 'd:0.0.1' + fake_id = 'd4' + + write_imitation_archive(tar_file_name, fake_id, [old_name]) + + expected = 'Archived image %s to %s, overwriting archive with image %s named %s' % ( + new_name, tar_file_name, fake_id, old_name + ) + actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, new_name, api_image_id(fake_id)) + + print('actual : %s', actual) + print('expected : %s', expected) + assert actual == expected diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py new file mode 100644 index 00000000..a937c6db --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py @@ -0,0 +1,35 @@ +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Unit tests for docker_network.""" + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + +from ansible_collections.community.docker.plugins.modules.docker_network import validate_cidr + + +@pytest.mark.parametrize("cidr,expected", [ + ('192.168.0.1/16', 'ipv4'), + ('192.168.0.1/24', 'ipv4'), + ('192.168.0.1/32', 'ipv4'), + ('fdd1:ac8c:0557:7ce2::/64', 'ipv6'), + ('fdd1:ac8c:0557:7ce2::/128', 'ipv6'), +]) +def test_validate_cidr_positives(cidr, expected): + assert validate_cidr(cidr) == expected + + +@pytest.mark.parametrize("cidr", [ + '192.168.0.1', + '192.168.0.1/34', + '192.168.0.1/asd', + 'fdd1:ac8c:0557:7ce2::', +]) +def test_validate_cidr_negatives(cidr): + with pytest.raises(ValueError) as e: + validate_cidr(cidr) + assert '"{0}" is not a valid CIDR'.format(cidr) == str(e.value) diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py new file mode 100644 index 00000000..1cef623b --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py @@ -0,0 +1,514 @@ +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest + + +class APIErrorMock(Exception): + def __init__(self, message, response=None, explanation=None): + self.message = message + self.response = response + self.explanation = explanation + + +@pytest.fixture(autouse=True) +def docker_module_mock(mocker): + docker_module_mock = mocker.MagicMock() + docker_utils_module_mock = mocker.MagicMock() + docker_errors_module_mock = mocker.MagicMock() + docker_errors_module_mock.APIError = APIErrorMock + mock_modules = { + 'docker': docker_module_mock, + 'docker.utils': docker_utils_module_mock, + 'docker.errors': docker_errors_module_mock, + } + return mocker.patch.dict('sys.modules', **mock_modules) + + +@pytest.fixture(autouse=True) +def docker_swarm_service(): + from ansible_collections.community.docker.plugins.modules import docker_swarm_service + + return docker_swarm_service + + +def test_retry_on_out_of_sequence_error(mocker, docker_swarm_service): + run_mock = mocker.MagicMock( + side_effect=APIErrorMock( + message='', + response=None, + explanation='rpc error: code = Unknown desc = update out of sequence', + ) + ) + manager = docker_swarm_service.DockerServiceManager(client=None) + manager.run = run_mock + with pytest.raises(APIErrorMock): + manager.run_safe() + assert run_mock.call_count == 3 + + +def test_no_retry_on_general_api_error(mocker, docker_swarm_service): + run_mock = mocker.MagicMock( + side_effect=APIErrorMock(message='', response=None, explanation='some error') + ) + manager = docker_swarm_service.DockerServiceManager(client=None) + manager.run = run_mock + with pytest.raises(APIErrorMock): + manager.run_safe() + assert run_mock.call_count == 1 + + +def test_get_docker_environment(mocker, docker_swarm_service): + env_file_result = {'TEST1': 'A', 'TEST2': 'B', 'TEST3': 'C'} + env_dict = {'TEST3': 'CC', 'TEST4': 'D'} + env_string = "TEST3=CC,TEST4=D" + + env_list = ['TEST3=CC', 'TEST4=D'] + expected_result = sorted(['TEST1=A', 'TEST2=B', 'TEST3=CC', 'TEST4=D']) + mocker.patch.object( + docker_swarm_service, 'parse_env_file', return_value=env_file_result + ) + mocker.patch.object( + docker_swarm_service, + 'format_environment', + side_effect=lambda d: ['{0}={1}'.format(key, value) for key, value in d.items()], + ) + # Test with env dict and file + result = docker_swarm_service.get_docker_environment( + env_dict, env_files=['dummypath'] + ) + assert result == expected_result + # Test with env list and file + result = docker_swarm_service.get_docker_environment( + env_list, + env_files=['dummypath'] + ) + assert result == expected_result + # Test with env string and file + result = docker_swarm_service.get_docker_environment( + env_string, env_files=['dummypath'] + ) + assert result == expected_result + + assert result == expected_result + # Test with empty env + result = docker_swarm_service.get_docker_environment( + [], env_files=None + ) + assert result == [] + # Test with empty env_files + result = docker_swarm_service.get_docker_environment( + None, env_files=[] + ) + assert result == [] + + +def test_get_nanoseconds_from_raw_option(docker_swarm_service): + value = docker_swarm_service.get_nanoseconds_from_raw_option('test', None) + assert value is None + + value = docker_swarm_service.get_nanoseconds_from_raw_option('test', '1m30s535ms') + assert value == 90535000000 + + value = docker_swarm_service.get_nanoseconds_from_raw_option('test', 10000000000) + assert value == 10000000000 + + with pytest.raises(ValueError): + docker_swarm_service.get_nanoseconds_from_raw_option('test', []) + + +def test_has_dict_changed(docker_swarm_service): + assert not docker_swarm_service.has_dict_changed( + {"a": 1}, + {"a": 1}, + ) + assert not docker_swarm_service.has_dict_changed( + {"a": 1}, + {"a": 1, "b": 2} + ) + assert docker_swarm_service.has_dict_changed( + {"a": 1}, + {"a": 2, "b": 2} + ) + assert docker_swarm_service.has_dict_changed( + {"a": 1, "b": 1}, + {"a": 1} + ) + assert not docker_swarm_service.has_dict_changed( + None, + {"a": 2, "b": 2} + ) + assert docker_swarm_service.has_dict_changed( + {}, + {"a": 2, "b": 2} + ) + assert docker_swarm_service.has_dict_changed( + {"a": 1}, + {} + ) + assert docker_swarm_service.has_dict_changed( + {"a": 1}, + None + ) + assert not docker_swarm_service.has_dict_changed( + {}, + {} + ) + assert not docker_swarm_service.has_dict_changed( + None, + None + ) + assert not docker_swarm_service.has_dict_changed( + {}, + None + ) + assert not docker_swarm_service.has_dict_changed( + None, + {} + ) + + +def test_has_list_changed(docker_swarm_service): + + # List comparisons without dictionaries + # I could improve the indenting, but pycodestyle wants this instead + assert not docker_swarm_service.has_list_changed(None, None) + assert not docker_swarm_service.has_list_changed(None, []) + assert not docker_swarm_service.has_list_changed(None, [1, 2]) + + assert not docker_swarm_service.has_list_changed([], None) + assert not docker_swarm_service.has_list_changed([], []) + assert docker_swarm_service.has_list_changed([], [1, 2]) + + assert docker_swarm_service.has_list_changed([1, 2], None) + assert docker_swarm_service.has_list_changed([1, 2], []) + + assert docker_swarm_service.has_list_changed([1, 2, 3], [1, 2]) + assert docker_swarm_service.has_list_changed([1, 2], [1, 2, 3]) + + # Check list sorting + assert not docker_swarm_service.has_list_changed([1, 2], [2, 1]) + assert docker_swarm_service.has_list_changed( + [1, 2], + [2, 1], + sort_lists=False + ) + + # Check type matching + assert docker_swarm_service.has_list_changed([None, 1], [2, 1]) + assert docker_swarm_service.has_list_changed([2, 1], [None, 1]) + assert docker_swarm_service.has_list_changed( + "command --with args", + ['command', '--with', 'args'] + ) + assert docker_swarm_service.has_list_changed( + ['sleep', '3400'], + [u'sleep', u'3600'], + sort_lists=False + ) + + # List comparisons with dictionaries + assert not docker_swarm_service.has_list_changed( + [{'a': 1}], + [{'a': 1}], + sort_key='a' + ) + + assert not docker_swarm_service.has_list_changed( + [{'a': 1}, {'a': 2}], + [{'a': 1}, {'a': 2}], + sort_key='a' + ) + + with pytest.raises(Exception): + docker_swarm_service.has_list_changed( + [{'a': 1}, {'a': 2}], + [{'a': 1}, {'a': 2}] + ) + + # List sort checking with sort key + assert not docker_swarm_service.has_list_changed( + [{'a': 1}, {'a': 2}], + [{'a': 2}, {'a': 1}], + sort_key='a' + ) + assert docker_swarm_service.has_list_changed( + [{'a': 1}, {'a': 2}], + [{'a': 2}, {'a': 1}], + sort_lists=False + ) + + assert docker_swarm_service.has_list_changed( + [{'a': 1}, {'a': 2}, {'a': 3}], + [{'a': 2}, {'a': 1}], + sort_key='a' + ) + assert docker_swarm_service.has_list_changed( + [{'a': 1}, {'a': 2}], + [{'a': 1}, {'a': 2}, {'a': 3}], + sort_lists=False + ) + + # Additional dictionary elements + assert not docker_swarm_service.has_list_changed( + [ + {"src": 1, "dst": 2}, + {"src": 1, "dst": 2, "protocol": "udp"}, + ], + [ + {"src": 1, "dst": 2, "protocol": "tcp"}, + {"src": 1, "dst": 2, "protocol": "udp"}, + ], + sort_key='dst' + ) + assert not docker_swarm_service.has_list_changed( + [ + {"src": 1, "dst": 2, "protocol": "udp"}, + {"src": 1, "dst": 3, "protocol": "tcp"}, + ], + [ + {"src": 1, "dst": 2, "protocol": "udp"}, + {"src": 1, "dst": 3, "protocol": "tcp"}, + ], + sort_key='dst' + ) + assert docker_swarm_service.has_list_changed( + [ + {"src": 1, "dst": 2, "protocol": "udp"}, + {"src": 1, "dst": 2}, + {"src": 3, "dst": 4}, + ], + [ + {"src": 1, "dst": 3, "protocol": "udp"}, + {"src": 1, "dst": 2, "protocol": "tcp"}, + {"src": 3, "dst": 4, "protocol": "tcp"}, + ], + sort_key='dst' + ) + assert docker_swarm_service.has_list_changed( + [ + {"src": 1, "dst": 3, "protocol": "tcp"}, + {"src": 1, "dst": 2, "protocol": "udp"}, + ], + [ + {"src": 1, "dst": 2, "protocol": "tcp"}, + {"src": 1, "dst": 2, "protocol": "udp"}, + ], + sort_key='dst' + ) + assert docker_swarm_service.has_list_changed( + [ + {"src": 1, "dst": 2, "protocol": "udp"}, + {"src": 1, "dst": 2, "protocol": "tcp", "extra": {"test": "foo"}}, + ], + [ + {"src": 1, "dst": 2, "protocol": "udp"}, + {"src": 1, "dst": 2, "protocol": "tcp"}, + ], + sort_key='dst' + ) + assert not docker_swarm_service.has_list_changed( + [{'id': '123', 'aliases': []}], + [{'id': '123'}], + sort_key='id' + ) + + +def test_have_networks_changed(docker_swarm_service): + assert not docker_swarm_service.have_networks_changed( + None, + None + ) + + assert not docker_swarm_service.have_networks_changed( + [], + None + ) + + assert not docker_swarm_service.have_networks_changed( + [{'id': 1}], + [{'id': 1}] + ) + + assert docker_swarm_service.have_networks_changed( + [{'id': 1}], + [{'id': 1}, {'id': 2}] + ) + + assert not docker_swarm_service.have_networks_changed( + [{'id': 1}, {'id': 2}], + [{'id': 1}, {'id': 2}] + ) + + assert not docker_swarm_service.have_networks_changed( + [{'id': 1}, {'id': 2}], + [{'id': 2}, {'id': 1}] + ) + + assert not docker_swarm_service.have_networks_changed( + [ + {'id': 1}, + {'id': 2, 'aliases': []} + ], + [ + {'id': 1}, + {'id': 2} + ] + ) + + assert docker_swarm_service.have_networks_changed( + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias1']} + ], + [ + {'id': 1}, + {'id': 2} + ] + ) + + assert docker_swarm_service.have_networks_changed( + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias1', 'alias2']} + ], + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias1']} + ] + ) + + assert not docker_swarm_service.have_networks_changed( + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias1', 'alias2']} + ], + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias1', 'alias2']} + ] + ) + + assert not docker_swarm_service.have_networks_changed( + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias1', 'alias2']} + ], + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias2', 'alias1']} + ] + ) + + assert not docker_swarm_service.have_networks_changed( + [ + {'id': 1, 'options': {}}, + {'id': 2, 'aliases': ['alias1', 'alias2']}], + [ + {'id': 1}, + {'id': 2, 'aliases': ['alias2', 'alias1']} + ] + ) + + assert not docker_swarm_service.have_networks_changed( + [ + {'id': 1, 'options': {'option1': 'value1'}}, + {'id': 2, 'aliases': ['alias1', 'alias2']}], + [ + {'id': 1, 'options': {'option1': 'value1'}}, + {'id': 2, 'aliases': ['alias2', 'alias1']} + ] + ) + + assert docker_swarm_service.have_networks_changed( + [ + {'id': 1, 'options': {'option1': 'value1'}}, + {'id': 2, 'aliases': ['alias1', 'alias2']}], + [ + {'id': 1, 'options': {'option1': 'value2'}}, + {'id': 2, 'aliases': ['alias2', 'alias1']} + ] + ) + + +def test_get_docker_networks(docker_swarm_service): + network_names = [ + 'network_1', + 'network_2', + 'network_3', + 'network_4', + ] + networks = [ + network_names[0], + {'name': network_names[1]}, + {'name': network_names[2], 'aliases': ['networkalias1']}, + {'name': network_names[3], 'aliases': ['networkalias2'], 'options': {'foo': 'bar'}}, + ] + network_ids = { + network_names[0]: '1', + network_names[1]: '2', + network_names[2]: '3', + network_names[3]: '4', + } + parsed_networks = docker_swarm_service.get_docker_networks( + networks, + network_ids + ) + assert len(parsed_networks) == 4 + for i, network in enumerate(parsed_networks): + assert 'name' not in network + assert 'id' in network + expected_name = network_names[i] + assert network['id'] == network_ids[expected_name] + if i == 2: + assert network['aliases'] == ['networkalias1'] + if i == 3: + assert network['aliases'] == ['networkalias2'] + if i == 3: + assert 'foo' in network['options'] + # Test missing name + with pytest.raises(TypeError): + docker_swarm_service.get_docker_networks([{'invalid': 'err'}], {'err': 1}) + # test for invalid aliases type + with pytest.raises(TypeError): + docker_swarm_service.get_docker_networks( + [{'name': 'test', 'aliases': 1}], + {'test': 1} + ) + # Test invalid aliases elements + with pytest.raises(TypeError): + docker_swarm_service.get_docker_networks( + [{'name': 'test', 'aliases': [1]}], + {'test': 1} + ) + # Test for invalid options type + with pytest.raises(TypeError): + docker_swarm_service.get_docker_networks( + [{'name': 'test', 'options': 1}], + {'test': 1} + ) + # Test for invalid networks type + with pytest.raises(TypeError): + docker_swarm_service.get_docker_networks( + 1, + {'test': 1} + ) + # Test for non existing networks + with pytest.raises(ValueError): + docker_swarm_service.get_docker_networks( + [{'name': 'idontexist'}], + {'test': 1} + ) + # Test empty values + assert docker_swarm_service.get_docker_networks([], {}) == [] + assert docker_swarm_service.get_docker_networks(None, {}) is None + # Test invalid options + with pytest.raises(TypeError): + docker_swarm_service.get_docker_networks( + [{'name': 'test', 'nonexisting_option': 'foo'}], + {'test': '1'} + ) diff --git a/ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py b/ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py new file mode 100644 index 00000000..842ec4cf --- /dev/null +++ b/ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py @@ -0,0 +1,76 @@ +# Copyright 2022 Red Hat | Ansible +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import tarfile +from tempfile import TemporaryFile + + +def write_imitation_archive(file_name, image_id, repo_tags): + ''' + Write a tar file meeting these requirements: + + * Has a file manifest.json + * manifest.json contains a one-element array + * The element has a Config property with "[image_id].json" as the value name + + :param file_name: Name of file to create + :type file_name: str + :param image_id: Fake sha256 hash (without the sha256: prefix) + :type image_id: str + :param repo_tags: list of fake image:tag's + :type repo_tags: list + ''' + + manifest = [ + { + 'Config': '%s.json' % image_id, + 'RepoTags': repo_tags + } + ] + + write_imitation_archive_with_manifest(file_name, manifest) + + +def write_imitation_archive_with_manifest(file_name, manifest): + tf = tarfile.open(file_name, 'w') + try: + with TemporaryFile() as f: + f.write(json.dumps(manifest).encode('utf-8')) + + ti = tarfile.TarInfo('manifest.json') + ti.size = f.tell() + + f.seek(0) + tf.addfile(ti, f) + + finally: + # In Python 2.6, this does not have __exit__ + tf.close() + + +def write_irrelevant_tar(file_name): + ''' + Create a tar file that does not match the spec for "docker image save" / "docker image load" commands. + + :param file_name: Name of tar file to create + :type file_name: str + ''' + + tf = tarfile.open(file_name, 'w') + try: + with TemporaryFile() as f: + f.write('Hello, world.'.encode('utf-8')) + + ti = tarfile.TarInfo('hi.txt') + ti.size = f.tell() + + f.seek(0) + tf.addfile(ti, f) + + finally: + tf.close() |