summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/docker/tests/unit/plugins
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 16:03:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 16:03:42 +0000
commit66cec45960ce1d9c794e9399de15c138acb18aed (patch)
tree59cd19d69e9d56b7989b080da7c20ef1a3fe2a5a /ansible_collections/community/docker/tests/unit/plugins
parentInitial commit. (diff)
downloadansible-66cec45960ce1d9c794e9399de15c138acb18aed.tar.xz
ansible-66cec45960ce1d9c794e9399de15c138acb18aed.zip
Adding upstream version 7.3.0+dfsg.upstream/7.3.0+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ansible_collections/community/docker/tests/unit/plugins')
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py57
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py214
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py702
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py668
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py145
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py819
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py141
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py57
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py96
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py515
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py141
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py54
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py77
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py162
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py100
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py488
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem7
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem7
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem7
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py28
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py77
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py94
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py522
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py32
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py114
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py35
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py514
-rw-r--r--ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py76
28 files changed, 5949 insertions, 0 deletions
diff --git a/ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py b/ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py
new file mode 100644
index 00000000..5ae6a8e1
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/connection/test_docker.py
@@ -0,0 +1,57 @@
+# Copyright (c) 2020 Red Hat, Inc.
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from io import StringIO
+
+from ansible_collections.community.docker.tests.unit.compat import mock
+from ansible_collections.community.docker.tests.unit.compat import unittest
+from ansible.errors import AnsibleError
+from ansible.playbook.play_context import PlayContext
+from ansible.plugins.loader import connection_loader
+
+
+class TestDockerConnectionClass(unittest.TestCase):
+
+ def setUp(self):
+ self.play_context = PlayContext()
+ self.play_context.prompt = (
+ '[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: '
+ )
+ self.in_stream = StringIO()
+ with mock.patch('ansible_collections.community.docker.plugins.connection.docker.get_bin_path', return_value='docker'):
+ self.dc = connection_loader.get('community.docker.docker', self.play_context, self.in_stream)
+
+ def tearDown(self):
+ pass
+
+ @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._old_docker_version',
+ return_value=('false', 'garbage', '', 1))
+ @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._new_docker_version',
+ return_value=('docker version', '1.2.3', '', 0))
+ def test_docker_connection_module_too_old(self, mock_new_docker_verison, mock_old_docker_version):
+ self.dc._version = None
+ self.dc.remote_user = 'foo'
+ self.assertRaisesRegexp(AnsibleError, '^docker connection type requires docker 1.3 or higher$', self.dc._get_actual_user)
+
+ @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._old_docker_version',
+ return_value=('false', 'garbage', '', 1))
+ @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._new_docker_version',
+ return_value=('docker version', '1.7.0', '', 0))
+ def test_docker_connection_module(self, mock_new_docker_verison, mock_old_docker_version):
+ self.dc._version = None
+ version = self.dc.docker_version
+
+ # old version and new version fail
+ @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._old_docker_version',
+ return_value=('false', 'garbage', '', 1))
+ @mock.patch('ansible_collections.community.docker.plugins.connection.docker.Connection._new_docker_version',
+ return_value=('false', 'garbage', '', 1))
+ def test_docker_connection_module_wrong_cmd(self, mock_new_docker_version, mock_old_docker_version):
+ self.dc._version = None
+ self.dc.remote_user = 'foo'
+ self.assertRaisesRegexp(AnsibleError, '^Docker version check (.*?) failed: ', self.dc._get_actual_user)
diff --git a/ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py b/ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py
new file mode 100644
index 00000000..ea16c0d9
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/inventory/test_docker_containers.py
@@ -0,0 +1,214 @@
+# Copyright (c), Felix Fontein <felix@fontein.de>, 2020
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
+import pytest
+
+from ansible.inventory.data import InventoryData
+
+from ansible_collections.community.docker.plugins.inventory.docker_containers import InventoryModule
+
+
+@pytest.fixture(scope="module")
+def inventory():
+ r = InventoryModule()
+ r.inventory = InventoryData()
+ return r
+
+
+LOVING_THARP = {
+ 'Id': '7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a',
+ 'Name': '/loving_tharp',
+ 'Image': 'sha256:349f492ff18add678364a62a67ce9a13487f14293ae0af1baf02398aa432f385',
+ 'State': {
+ 'Running': True,
+ },
+ 'Config': {
+ 'Image': 'quay.io/ansible/ubuntu1804-test-container:1.21.0',
+ },
+}
+
+
+LOVING_THARP_STACK = {
+ 'Id': '7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a',
+ 'Name': '/loving_tharp',
+ 'Image': 'sha256:349f492ff18add678364a62a67ce9a13487f14293ae0af1baf02398aa432f385',
+ 'State': {
+ 'Running': True,
+ },
+ 'Config': {
+ 'Image': 'quay.io/ansible/ubuntu1804-test-container:1.21.0',
+ 'Labels': {
+ 'com.docker.stack.namespace': 'my_stack',
+ },
+ },
+ 'NetworkSettings': {
+ 'Ports': {
+ '22/tcp': [
+ {
+ 'HostIp': '0.0.0.0',
+ 'HostPort': '32802'
+ }
+ ],
+ },
+ },
+}
+
+
+LOVING_THARP_SERVICE = {
+ 'Id': '7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a',
+ 'Name': '/loving_tharp',
+ 'Image': 'sha256:349f492ff18add678364a62a67ce9a13487f14293ae0af1baf02398aa432f385',
+ 'State': {
+ 'Running': True,
+ },
+ 'Config': {
+ 'Image': 'quay.io/ansible/ubuntu1804-test-container:1.21.0',
+ 'Labels': {
+ 'com.docker.swarm.service.name': 'my_service',
+ },
+ },
+}
+
+
+def create_get_option(options, default=False):
+ def get_option(option):
+ if option in options:
+ return options[option]
+ return default
+
+ return get_option
+
+
+class FakeClient(object):
+ def __init__(self, *hosts):
+ self.get_results = {}
+ list_reply = []
+ for host in hosts:
+ list_reply.append({
+ 'Id': host['Id'],
+ 'Names': [host['Name']] if host['Name'] else [],
+ 'Image': host['Config']['Image'],
+ 'ImageId': host['Image'],
+ })
+ self.get_results['/containers/{0}/json'.format(host['Name'])] = host
+ self.get_results['/containers/{0}/json'.format(host['Id'])] = host
+ self.get_results['/containers/json'] = list_reply
+
+ def get_json(self, url, *param, **kwargs):
+ url = url.format(*param)
+ return self.get_results[url]
+
+
+def test_populate(inventory, mocker):
+ client = FakeClient(LOVING_THARP)
+
+ inventory.get_option = mocker.MagicMock(side_effect=create_get_option({
+ 'verbose_output': True,
+ 'connection_type': 'docker-api',
+ 'add_legacy_groups': False,
+ 'compose': {},
+ 'groups': {},
+ 'keyed_groups': {},
+ }))
+ inventory._populate(client)
+
+ host_1 = inventory.inventory.get_host('loving_tharp')
+ host_1_vars = host_1.get_vars()
+
+ assert host_1_vars['ansible_host'] == 'loving_tharp'
+ assert host_1_vars['ansible_connection'] == 'community.docker.docker_api'
+ assert 'ansible_ssh_host' not in host_1_vars
+ assert 'ansible_ssh_port' not in host_1_vars
+ assert 'docker_state' in host_1_vars
+ assert 'docker_config' in host_1_vars
+ assert 'docker_image' in host_1_vars
+
+ assert len(inventory.inventory.groups['ungrouped'].hosts) == 0
+ assert len(inventory.inventory.groups['all'].hosts) == 0
+ assert len(inventory.inventory.groups) == 2
+ assert len(inventory.inventory.hosts) == 1
+
+
+def test_populate_service(inventory, mocker):
+ client = FakeClient(LOVING_THARP_SERVICE)
+
+ inventory.get_option = mocker.MagicMock(side_effect=create_get_option({
+ 'verbose_output': False,
+ 'connection_type': 'docker-cli',
+ 'add_legacy_groups': True,
+ 'compose': {},
+ 'groups': {},
+ 'keyed_groups': {},
+ 'docker_host': 'unix://var/run/docker.sock',
+ }))
+ inventory._populate(client)
+
+ host_1 = inventory.inventory.get_host('loving_tharp')
+ host_1_vars = host_1.get_vars()
+
+ assert host_1_vars['ansible_host'] == 'loving_tharp'
+ assert host_1_vars['ansible_connection'] == 'community.docker.docker'
+ assert 'ansible_ssh_host' not in host_1_vars
+ assert 'ansible_ssh_port' not in host_1_vars
+ assert 'docker_state' not in host_1_vars
+ assert 'docker_config' not in host_1_vars
+ assert 'docker_image' not in host_1_vars
+
+ assert len(inventory.inventory.groups['ungrouped'].hosts) == 0
+ assert len(inventory.inventory.groups['all'].hosts) == 0
+ assert len(inventory.inventory.groups['7bd547963679e'].hosts) == 1
+ assert len(inventory.inventory.groups['7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a'].hosts) == 1
+ assert len(inventory.inventory.groups['image_quay.io/ansible/ubuntu1804-test-container:1.21.0'].hosts) == 1
+ assert len(inventory.inventory.groups['loving_tharp'].hosts) == 1
+ assert len(inventory.inventory.groups['running'].hosts) == 1
+ assert len(inventory.inventory.groups['stopped'].hosts) == 0
+ assert len(inventory.inventory.groups['service_my_service'].hosts) == 1
+ assert len(inventory.inventory.groups['unix://var/run/docker.sock'].hosts) == 1
+ assert len(inventory.inventory.groups) == 10
+ assert len(inventory.inventory.hosts) == 1
+
+
+def test_populate_stack(inventory, mocker):
+ client = FakeClient(LOVING_THARP_STACK)
+
+ inventory.get_option = mocker.MagicMock(side_effect=create_get_option({
+ 'verbose_output': False,
+ 'connection_type': 'ssh',
+ 'add_legacy_groups': True,
+ 'compose': {},
+ 'groups': {},
+ 'keyed_groups': {},
+ 'docker_host': 'unix://var/run/docker.sock',
+ 'default_ip': '127.0.0.1',
+ 'private_ssh_port': 22,
+ }))
+ inventory._populate(client)
+
+ host_1 = inventory.inventory.get_host('loving_tharp')
+ host_1_vars = host_1.get_vars()
+
+ assert host_1_vars['ansible_ssh_host'] == '127.0.0.1'
+ assert host_1_vars['ansible_ssh_port'] == '32802'
+ assert 'ansible_host' not in host_1_vars
+ assert 'ansible_connection' not in host_1_vars
+ assert 'docker_state' not in host_1_vars
+ assert 'docker_config' not in host_1_vars
+ assert 'docker_image' not in host_1_vars
+
+ assert len(inventory.inventory.groups['ungrouped'].hosts) == 0
+ assert len(inventory.inventory.groups['all'].hosts) == 0
+ assert len(inventory.inventory.groups['7bd547963679e'].hosts) == 1
+ assert len(inventory.inventory.groups['7bd547963679e3209cafd52aff21840b755c96fd37abcd7a6e19da8da6a7f49a'].hosts) == 1
+ assert len(inventory.inventory.groups['image_quay.io/ansible/ubuntu1804-test-container:1.21.0'].hosts) == 1
+ assert len(inventory.inventory.groups['loving_tharp'].hosts) == 1
+ assert len(inventory.inventory.groups['running'].hosts) == 1
+ assert len(inventory.inventory.groups['stopped'].hosts) == 0
+ assert len(inventory.inventory.groups['stack_my_stack'].hosts) == 1
+ assert len(inventory.inventory.groups['unix://var/run/docker.sock'].hosts) == 1
+ assert len(inventory.inventory.groups) == 10
+ assert len(inventory.inventory.hosts) == 1
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py
new file mode 100644
index 00000000..ea003565
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/api/test_client.py
@@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import datetime
+import io
+import json
+import os
+import re
+import shutil
+import socket
+import struct
+import tempfile
+import threading
+import time
+import unittest
+import sys
+
+from ansible.module_utils import six
+
+import pytest
+import requests
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api import constants, errors
+from ansible_collections.community.docker.plugins.module_utils._api.api.client import APIClient
+from ansible_collections.community.docker.plugins.module_utils._api.constants import DEFAULT_DOCKER_API_VERSION
+from ansible_collections.community.docker.plugins.module_utils._api.utils.utils import convert_filters
+from requests.packages import urllib3
+
+from .. import fake_api
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+DEFAULT_TIMEOUT_SECONDS = constants.DEFAULT_TIMEOUT_SECONDS
+
+
+def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
+ request=None, raw=None):
+ res = requests.Response()
+ res.status_code = status_code
+ if not isinstance(content, six.binary_type):
+ content = json.dumps(content).encode('ascii')
+ res._content = content
+ res.headers = requests.structures.CaseInsensitiveDict(headers or {})
+ res.reason = reason
+ res.elapsed = datetime.timedelta(elapsed)
+ res.request = request
+ res.raw = raw
+ return res
+
+
+def fake_resolve_authconfig(authconfig, registry=None, *args, **kwargs):
+ return None
+
+
+def fake_inspect_container(self, container, tty=False):
+ return fake_api.get_fake_inspect_container(tty=tty)[1]
+
+
+def fake_resp(method, url, *args, **kwargs):
+ key = None
+ if url in fake_api.fake_responses:
+ key = url
+ elif (url, method) in fake_api.fake_responses:
+ key = (url, method)
+ if not key:
+ raise Exception('{method} {url}'.format(method=method, url=url))
+ status_code, content = fake_api.fake_responses[key]()
+ return response(status_code=status_code, content=content)
+
+
+fake_request = mock.Mock(side_effect=fake_resp)
+
+
+def fake_get(self, url, *args, **kwargs):
+ return fake_request('GET', url, *args, **kwargs)
+
+
+def fake_post(self, url, *args, **kwargs):
+ return fake_request('POST', url, *args, **kwargs)
+
+
+def fake_put(self, url, *args, **kwargs):
+ return fake_request('PUT', url, *args, **kwargs)
+
+
+def fake_delete(self, url, *args, **kwargs):
+ return fake_request('DELETE', url, *args, **kwargs)
+
+
+def fake_read_from_socket(self, response, stream, tty=False, demux=False):
+ return six.binary_type()
+
+
+url_base = '{prefix}/'.format(prefix=fake_api.prefix)
+url_prefix = '{0}v{1}/'.format(
+ url_base,
+ constants.DEFAULT_DOCKER_API_VERSION)
+
+
+class BaseAPIClientTest(unittest.TestCase):
+ def setUp(self):
+ self.patcher = mock.patch.multiple(
+ 'ansible_collections.community.docker.plugins.module_utils._api.api.client.APIClient',
+ get=fake_get,
+ post=fake_post,
+ put=fake_put,
+ delete=fake_delete,
+ _read_from_socket=fake_read_from_socket
+ )
+ self.patcher.start()
+ self.client = APIClient(version=DEFAULT_DOCKER_API_VERSION)
+
+ def tearDown(self):
+ self.client.close()
+ self.patcher.stop()
+
+ def base_create_payload(self, img='busybox', cmd=None):
+ if not cmd:
+ cmd = ['true']
+ return {"Tty": False, "Image": img, "Cmd": cmd,
+ "AttachStdin": False,
+ "AttachStderr": True, "AttachStdout": True,
+ "StdinOnce": False,
+ "OpenStdin": False, "NetworkDisabled": False,
+ }
+
+
+class DockerApiTest(BaseAPIClientTest):
+ def test_ctor(self):
+ with pytest.raises(errors.DockerException) as excinfo:
+ APIClient(version=1.12)
+
+ assert str(
+ excinfo.value
+ ) == 'Version parameter must be a string or None. Found float'
+
+ def test_url_valid_resource(self):
+ url = self.client._url('/hello/{0}/world', 'somename')
+ assert url == '{0}{1}'.format(url_prefix, 'hello/somename/world')
+
+ url = self.client._url(
+ '/hello/{0}/world/{1}', 'somename', 'someothername'
+ )
+ assert url == '{0}{1}'.format(
+ url_prefix, 'hello/somename/world/someothername'
+ )
+
+ url = self.client._url('/hello/{0}/world', 'some?name')
+ assert url == '{0}{1}'.format(url_prefix, 'hello/some%3Fname/world')
+
+ url = self.client._url("/images/{0}/push", "localhost:5000/image")
+ assert url == '{0}{1}'.format(
+ url_prefix, 'images/localhost:5000/image/push'
+ )
+
+ def test_url_invalid_resource(self):
+ with pytest.raises(ValueError):
+ self.client._url('/hello/{0}/world', ['sakuya', 'izayoi'])
+
+ def test_url_no_resource(self):
+ url = self.client._url('/simple')
+ assert url == '{0}{1}'.format(url_prefix, 'simple')
+
+ def test_url_unversioned_api(self):
+ url = self.client._url(
+ '/hello/{0}/world', 'somename', versioned_api=False
+ )
+ assert url == '{0}{1}'.format(url_base, 'hello/somename/world')
+
+ def test_version(self):
+ self.client.version()
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'version',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_version_no_api_version(self):
+ self.client.version(False)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_base + 'version',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_retrieve_server_version(self):
+ client = APIClient(version="auto")
+ assert isinstance(client._version, six.string_types)
+ assert not (client._version == "auto")
+ client.close()
+
+ def test_auto_retrieve_server_version(self):
+ version = self.client._retrieve_server_version()
+ assert isinstance(version, six.string_types)
+
+ def test_info(self):
+ self.client.info()
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'info',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_search(self):
+ self.client.get_json('/images/search', params={'term': 'busybox'})
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/search',
+ params={'term': 'busybox'},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_login(self):
+ self.client.login('sakuya', 'izayoi')
+ args = fake_request.call_args
+ assert args[0][0] == 'POST'
+ assert args[0][1] == url_prefix + 'auth'
+ assert json.loads(args[1]['data']) == {
+ 'username': 'sakuya', 'password': 'izayoi'
+ }
+ assert args[1]['headers'] == {'Content-Type': 'application/json'}
+ assert self.client._auth_configs.auths['docker.io'] == {
+ 'email': None,
+ 'password': 'izayoi',
+ 'username': 'sakuya',
+ 'serveraddress': None,
+ }
+
+ def test_events(self):
+ self.client.events()
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'events',
+ params={'since': None, 'until': None, 'filters': None},
+ stream=True,
+ timeout=None
+ )
+
+ def test_events_with_since_until(self):
+ ts = 1356048000
+ now = datetime.datetime.utcfromtimestamp(ts)
+ since = now - datetime.timedelta(seconds=10)
+ until = now + datetime.timedelta(seconds=10)
+
+ self.client.events(since=since, until=until)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'events',
+ params={
+ 'since': ts - 10,
+ 'until': ts + 10,
+ 'filters': None
+ },
+ stream=True,
+ timeout=None
+ )
+
+ def test_events_with_filters(self):
+ filters = {'event': ['die', 'stop'],
+ 'container': fake_api.FAKE_CONTAINER_ID}
+
+ self.client.events(filters=filters)
+
+ expected_filters = convert_filters(filters)
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'events',
+ params={
+ 'since': None,
+ 'until': None,
+ 'filters': expected_filters
+ },
+ stream=True,
+ timeout=None
+ )
+
+ def _socket_path_for_client_session(self, client):
+ socket_adapter = client.get_adapter('http+docker://')
+ return socket_adapter.socket_path
+
+ def test_url_compatibility_unix(self):
+ c = APIClient(
+ base_url="unix://socket",
+ version=DEFAULT_DOCKER_API_VERSION)
+
+ assert self._socket_path_for_client_session(c) == '/socket'
+
+ def test_url_compatibility_unix_triple_slash(self):
+ c = APIClient(
+ base_url="unix:///socket",
+ version=DEFAULT_DOCKER_API_VERSION)
+
+ assert self._socket_path_for_client_session(c) == '/socket'
+
+ def test_url_compatibility_http_unix_triple_slash(self):
+ c = APIClient(
+ base_url="http+unix:///socket",
+ version=DEFAULT_DOCKER_API_VERSION)
+
+ assert self._socket_path_for_client_session(c) == '/socket'
+
+ def test_url_compatibility_http(self):
+ c = APIClient(
+ base_url="http://hostname:1234",
+ version=DEFAULT_DOCKER_API_VERSION)
+
+ assert c.base_url == "http://hostname:1234"
+
+ def test_url_compatibility_tcp(self):
+ c = APIClient(
+ base_url="tcp://hostname:1234",
+ version=DEFAULT_DOCKER_API_VERSION)
+
+ assert c.base_url == "http://hostname:1234"
+
+ def test_remove_link(self):
+ self.client.delete_call('/containers/{0}', '3cc2351ab11b', params={'v': False, 'link': True, 'force': False})
+
+ fake_request.assert_called_with(
+ 'DELETE',
+ url_prefix + 'containers/3cc2351ab11b',
+ params={'v': False, 'link': True, 'force': False},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_stream_helper_decoding(self):
+ status_code, content = fake_api.fake_responses[url_prefix + 'events']()
+ content_str = json.dumps(content)
+ if six.PY3:
+ content_str = content_str.encode('utf-8')
+ body = io.BytesIO(content_str)
+
+ # mock a stream interface
+ raw_resp = urllib3.HTTPResponse(body=body)
+ setattr(raw_resp._fp, 'chunked', True)
+ setattr(raw_resp._fp, 'chunk_left', len(body.getvalue()) - 1)
+
+ # pass `decode=False` to the helper
+ raw_resp._fp.seek(0)
+ resp = response(status_code=status_code, content=content, raw=raw_resp)
+ result = next(self.client._stream_helper(resp))
+ assert result == content_str
+
+ # pass `decode=True` to the helper
+ raw_resp._fp.seek(0)
+ resp = response(status_code=status_code, content=content, raw=raw_resp)
+ result = next(self.client._stream_helper(resp, decode=True))
+ assert result == content
+
+ # non-chunked response, pass `decode=False` to the helper
+ setattr(raw_resp._fp, 'chunked', False)
+ raw_resp._fp.seek(0)
+ resp = response(status_code=status_code, content=content, raw=raw_resp)
+ result = next(self.client._stream_helper(resp))
+ assert result == content_str.decode('utf-8')
+
+ # non-chunked response, pass `decode=True` to the helper
+ raw_resp._fp.seek(0)
+ resp = response(status_code=status_code, content=content, raw=raw_resp)
+ result = next(self.client._stream_helper(resp, decode=True))
+ assert result == content
+
+
+class UnixSocketStreamTest(unittest.TestCase):
+ def setUp(self):
+ socket_dir = tempfile.mkdtemp()
+ self.build_context = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, socket_dir)
+ self.addCleanup(shutil.rmtree, self.build_context)
+ self.socket_file = os.path.join(socket_dir, 'test_sock.sock')
+ self.server_socket = self._setup_socket()
+ self.stop_server = False
+ server_thread = threading.Thread(target=self.run_server)
+ server_thread.daemon = True
+ server_thread.start()
+ self.response = None
+ self.request_handler = None
+ self.addCleanup(server_thread.join)
+ self.addCleanup(self.stop)
+
+ def stop(self):
+ self.stop_server = True
+
+ def _setup_socket(self):
+ server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ server_sock.bind(self.socket_file)
+ # Non-blocking mode so that we can shut the test down easily
+ server_sock.setblocking(0)
+ server_sock.listen(5)
+ return server_sock
+
+ def run_server(self):
+ try:
+ while not self.stop_server:
+ try:
+ connection, client_address = self.server_socket.accept()
+ except socket.error:
+ # Probably no connection to accept yet
+ time.sleep(0.01)
+ continue
+
+ connection.setblocking(1)
+ try:
+ self.request_handler(connection)
+ finally:
+ connection.close()
+ finally:
+ self.server_socket.close()
+
+ def early_response_sending_handler(self, connection):
+ data = b''
+ headers = None
+
+ connection.sendall(self.response)
+ while not headers:
+ data += connection.recv(2048)
+ parts = data.split(b'\r\n\r\n', 1)
+ if len(parts) == 2:
+ headers, data = parts
+
+ mo = re.search(r'Content-Length: ([0-9]+)', headers.decode())
+ assert mo
+ content_length = int(mo.group(1))
+
+ while True:
+ if len(data) >= content_length:
+ break
+
+ data += connection.recv(2048)
+
+ @pytest.mark.skipif(
+ constants.IS_WINDOWS_PLATFORM, reason='Unix only'
+ )
+ def test_early_stream_response(self):
+ self.request_handler = self.early_response_sending_handler
+ lines = []
+ for i in range(0, 50):
+ line = str(i).encode()
+ lines += [('%x' % len(line)).encode(), line]
+ lines.append(b'0')
+ lines.append(b'')
+
+ self.response = (
+ b'HTTP/1.1 200 OK\r\n'
+ b'Transfer-Encoding: chunked\r\n'
+ b'\r\n'
+ ) + b'\r\n'.join(lines)
+
+ with APIClient(
+ base_url="http+unix://" + self.socket_file,
+ version=DEFAULT_DOCKER_API_VERSION) as client:
+ for i in range(5):
+ try:
+ params = {
+ 't': None,
+ 'remote': None,
+ 'q': False,
+ 'nocache': False,
+ 'rm': False,
+ 'forcerm': False,
+ 'pull': False,
+ 'dockerfile': 'Dockerfile',
+ }
+ headers = {'Content-Type': 'application/tar'}
+ data = b'...'
+ response = client._post(client._url('/build'), params=params, headers=headers, data=data, stream=True)
+ stream = client._stream_helper(response, decode=False)
+ break
+ except requests.ConnectionError as e:
+ if i == 4:
+ raise e
+
+ assert list(stream) == [
+ str(i).encode() for i in range(50)
+ ]
+
+
+@pytest.mark.skip(
+ 'This test requires starting a networking server and tries to access it. '
+ 'This does not work with network separation with Docker-based unit tests, '
+ 'but it does work with podman-based unit tests.'
+)
+class TCPSocketStreamTest(unittest.TestCase):
+ stdout_data = b'''
+ Now, those children out there, they're jumping through the
+ flames in the hope that the god of the fire will make them fruitful.
+ Really, you can't blame them. After all, what girl would not prefer the
+ child of a god to that of some acne-scarred artisan?
+ '''
+ stderr_data = b'''
+ And what of the true God? To whose glory churches and monasteries have been
+ built on these islands for generations past? Now shall what of Him?
+ '''
+
+ @classmethod
+ def setup_class(cls):
+ cls.server = six.moves.socketserver.ThreadingTCPServer(
+ ('', 0), cls.get_handler_class())
+ cls.thread = threading.Thread(target=cls.server.serve_forever)
+ cls.thread.daemon = True
+ cls.thread.start()
+ cls.address = 'http://{0}:{1}'.format(
+ socket.gethostname(), cls.server.server_address[1])
+
+ @classmethod
+ def teardown_class(cls):
+ cls.server.shutdown()
+ cls.server.server_close()
+ cls.thread.join()
+
+ @classmethod
+ def get_handler_class(cls):
+ stdout_data = cls.stdout_data
+ stderr_data = cls.stderr_data
+
+ class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def do_POST(self):
+ resp_data = self.get_resp_data()
+ self.send_response(101)
+ self.send_header(
+ 'Content-Type', 'application/vnd.docker.raw-stream')
+ self.send_header('Connection', 'Upgrade')
+ self.send_header('Upgrade', 'tcp')
+ self.end_headers()
+ self.wfile.flush()
+ time.sleep(0.2)
+ self.wfile.write(resp_data)
+ self.wfile.flush()
+
+ def get_resp_data(self):
+ path = self.path.split('/')[-1]
+ if path == 'tty':
+ return stdout_data + stderr_data
+ elif path == 'no-tty':
+ data = b''
+ data += self.frame_header(1, stdout_data)
+ data += stdout_data
+ data += self.frame_header(2, stderr_data)
+ data += stderr_data
+ return data
+ else:
+ raise Exception('Unknown path {path}'.format(path=path))
+
+ @staticmethod
+ def frame_header(stream, data):
+ return struct.pack('>BxxxL', stream, len(data))
+
+ return Handler
+
+ def request(self, stream=None, tty=None, demux=None):
+ assert stream is not None and tty is not None and demux is not None
+ with APIClient(
+ base_url=self.address,
+ version=DEFAULT_DOCKER_API_VERSION,
+ ) as client:
+ if tty:
+ url = client._url('/tty')
+ else:
+ url = client._url('/no-tty')
+ resp = client._post(url, stream=True)
+ return client._read_from_socket(
+ resp, stream=stream, tty=tty, demux=demux)
+
+ def test_read_from_socket_tty(self):
+ res = self.request(stream=True, tty=True, demux=False)
+ assert next(res) == self.stdout_data + self.stderr_data
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_tty_demux(self):
+ res = self.request(stream=True, tty=True, demux=True)
+ assert next(res) == (self.stdout_data + self.stderr_data, None)
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_no_tty(self):
+ res = self.request(stream=True, tty=False, demux=False)
+ assert next(res) == self.stdout_data
+ assert next(res) == self.stderr_data
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_no_tty_demux(self):
+ res = self.request(stream=True, tty=False, demux=True)
+ assert (self.stdout_data, None) == next(res)
+ assert (None, self.stderr_data) == next(res)
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_no_stream_tty(self):
+ res = self.request(stream=False, tty=True, demux=False)
+ assert res == self.stdout_data + self.stderr_data
+
+ def test_read_from_socket_no_stream_tty_demux(self):
+ res = self.request(stream=False, tty=True, demux=True)
+ assert res == (self.stdout_data + self.stderr_data, None)
+
+ def test_read_from_socket_no_stream_no_tty(self):
+ res = self.request(stream=False, tty=False, demux=False)
+ res == self.stdout_data + self.stderr_data
+
+ def test_read_from_socket_no_stream_no_tty_demux(self):
+ res = self.request(stream=False, tty=False, demux=True)
+ assert res == (self.stdout_data, self.stderr_data)
+
+
+class UserAgentTest(unittest.TestCase):
+ def setUp(self):
+ self.patcher = mock.patch.object(
+ APIClient,
+ 'send',
+ return_value=fake_resp("GET", "%s/version" % fake_api.prefix)
+ )
+ self.mock_send = self.patcher.start()
+
+ def tearDown(self):
+ self.patcher.stop()
+
+ def test_default_user_agent(self):
+ client = APIClient(version=DEFAULT_DOCKER_API_VERSION)
+ client.version()
+
+ assert self.mock_send.call_count == 1
+ headers = self.mock_send.call_args[0][0].headers
+ expected = 'ansible-community.docker'
+ assert headers['User-Agent'] == expected
+
+ def test_custom_user_agent(self):
+ client = APIClient(
+ user_agent='foo/bar',
+ version=DEFAULT_DOCKER_API_VERSION)
+ client.version()
+
+ assert self.mock_send.call_count == 1
+ headers = self.mock_send.call_args[0][0].headers
+ assert headers['User-Agent'] == 'foo/bar'
+
+
+class DisableSocketTest(unittest.TestCase):
+ class DummySocket:
+ def __init__(self, timeout=60):
+ self.timeout = timeout
+
+ def settimeout(self, timeout):
+ self.timeout = timeout
+
+ def gettimeout(self):
+ return self.timeout
+
+ def setUp(self):
+ self.client = APIClient(version=DEFAULT_DOCKER_API_VERSION)
+
+ def test_disable_socket_timeout(self):
+ """Test that the timeout is disabled on a generic socket object."""
+ socket = self.DummySocket()
+
+ self.client._disable_socket_timeout(socket)
+
+ assert socket.timeout is None
+
+ def test_disable_socket_timeout2(self):
+ """Test that the timeouts are disabled on a generic socket object
+ and it's _sock object if present."""
+ socket = self.DummySocket()
+ socket._sock = self.DummySocket()
+
+ self.client._disable_socket_timeout(socket)
+
+ assert socket.timeout is None
+ assert socket._sock.timeout is None
+
+ def test_disable_socket_timout_non_blocking(self):
+ """Test that a non-blocking socket does not get set to blocking."""
+ socket = self.DummySocket()
+ socket._sock = self.DummySocket(0.0)
+
+ self.client._disable_socket_timeout(socket)
+
+ assert socket.timeout is None
+ assert socket._sock.timeout == 0.0
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py
new file mode 100644
index 00000000..b794ff3d
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_api.py
@@ -0,0 +1,668 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible_collections.community.docker.plugins.module_utils._api import constants
+
+from . import fake_stat
+
+CURRENT_VERSION = 'v{api_version}'.format(api_version=constants.DEFAULT_DOCKER_API_VERSION)
+
+FAKE_CONTAINER_ID = '3cc2351ab11b'
+FAKE_IMAGE_ID = 'e9aa60c60128'
+FAKE_EXEC_ID = 'd5d177f121dc'
+FAKE_NETWORK_ID = '33fb6a3462b8'
+FAKE_IMAGE_NAME = 'test_image'
+FAKE_TARBALL_PATH = '/path/to/tarball'
+FAKE_REPO_NAME = 'repo'
+FAKE_TAG_NAME = 'tag'
+FAKE_FILE_NAME = 'file'
+FAKE_URL = 'myurl'
+FAKE_PATH = '/path'
+FAKE_VOLUME_NAME = 'perfectcherryblossom'
+FAKE_NODE_ID = '24ifsmvkjbyhk'
+FAKE_SECRET_ID = 'epdyrw4tsi03xy3deu8g8ly6o'
+FAKE_SECRET_NAME = 'super_secret'
+
+# Each method is prefixed with HTTP method (get, post...)
+# for clarity and readability
+
+
+def get_fake_version():
+ status_code = 200
+ response = {
+ 'ApiVersion': '1.35',
+ 'Arch': 'amd64',
+ 'BuildTime': '2018-01-10T20:09:37.000000000+00:00',
+ 'Components': [{
+ 'Details': {
+ 'ApiVersion': '1.35',
+ 'Arch': 'amd64',
+ 'BuildTime': '2018-01-10T20:09:37.000000000+00:00',
+ 'Experimental': 'false',
+ 'GitCommit': '03596f5',
+ 'GoVersion': 'go1.9.2',
+ 'KernelVersion': '4.4.0-112-generic',
+ 'MinAPIVersion': '1.12',
+ 'Os': 'linux'
+ },
+ 'Name': 'Engine',
+ 'Version': '18.01.0-ce'
+ }],
+ 'GitCommit': '03596f5',
+ 'GoVersion': 'go1.9.2',
+ 'KernelVersion': '4.4.0-112-generic',
+ 'MinAPIVersion': '1.12',
+ 'Os': 'linux',
+ 'Platform': {'Name': ''},
+ 'Version': '18.01.0-ce'
+ }
+
+ return status_code, response
+
+
+def get_fake_info():
+ status_code = 200
+ response = {'Containers': 1, 'Images': 1, 'Debug': False,
+ 'MemoryLimit': False, 'SwapLimit': False,
+ 'IPv4Forwarding': True}
+ return status_code, response
+
+
+def post_fake_auth():
+ status_code = 200
+ response = {'Status': 'Login Succeeded',
+ 'IdentityToken': '9cbaf023786cd7'}
+ return status_code, response
+
+
+def get_fake_ping():
+ return 200, "OK"
+
+
+def get_fake_search():
+ status_code = 200
+ response = [{'Name': 'busybox', 'Description': 'Fake Description'}]
+ return status_code, response
+
+
+def get_fake_images():
+ status_code = 200
+ response = [{
+ 'Id': FAKE_IMAGE_ID,
+ 'Created': '2 days ago',
+ 'Repository': 'busybox',
+ 'RepoTags': ['busybox:latest', 'busybox:1.0'],
+ }]
+ return status_code, response
+
+
+def get_fake_image_history():
+ status_code = 200
+ response = [
+ {
+ "Id": "b750fe79269d",
+ "Created": 1364102658,
+ "CreatedBy": "/bin/bash"
+ },
+ {
+ "Id": "27cf78414709",
+ "Created": 1364068391,
+ "CreatedBy": ""
+ }
+ ]
+
+ return status_code, response
+
+
+def post_fake_import_image():
+ status_code = 200
+ response = 'Import messages...'
+
+ return status_code, response
+
+
+def get_fake_containers():
+ status_code = 200
+ response = [{
+ 'Id': FAKE_CONTAINER_ID,
+ 'Image': 'busybox:latest',
+ 'Created': '2 days ago',
+ 'Command': 'true',
+ 'Status': 'fake status'
+ }]
+ return status_code, response
+
+
+def post_fake_start_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_resize_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_create_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def get_fake_inspect_container(tty=False):
+ status_code = 200
+ response = {
+ 'Id': FAKE_CONTAINER_ID,
+ 'Config': {'Labels': {'foo': 'bar'}, 'Privileged': True, 'Tty': tty},
+ 'ID': FAKE_CONTAINER_ID,
+ 'Image': 'busybox:latest',
+ 'Name': 'foobar',
+ "State": {
+ "Status": "running",
+ "Running": True,
+ "Pid": 0,
+ "ExitCode": 0,
+ "StartedAt": "2013-09-25T14:01:18.869545111+02:00",
+ "Ghost": False
+ },
+ "HostConfig": {
+ "LogConfig": {
+ "Type": "json-file",
+ "Config": {}
+ },
+ },
+ "MacAddress": "02:42:ac:11:00:0a"
+ }
+ return status_code, response
+
+
+def get_fake_inspect_image():
+ status_code = 200
+ response = {
+ 'Id': FAKE_IMAGE_ID,
+ 'Parent': "27cf784147099545",
+ 'Created': "2013-03-23T22:24:18.818426-07:00",
+ 'Container': FAKE_CONTAINER_ID,
+ 'Config': {'Labels': {'bar': 'foo'}},
+ 'ContainerConfig':
+ {
+ "Hostname": "",
+ "User": "",
+ "Memory": 0,
+ "MemorySwap": 0,
+ "AttachStdin": False,
+ "AttachStdout": False,
+ "AttachStderr": False,
+ "PortSpecs": "",
+ "Tty": True,
+ "OpenStdin": True,
+ "StdinOnce": False,
+ "Env": "",
+ "Cmd": ["/bin/bash"],
+ "Dns": "",
+ "Image": "base",
+ "Volumes": "",
+ "VolumesFrom": "",
+ "WorkingDir": ""
+ },
+ 'Size': 6823592
+ }
+ return status_code, response
+
+
+def get_fake_insert_image():
+ status_code = 200
+ response = {'StatusCode': 0}
+ return status_code, response
+
+
+def get_fake_wait():
+ status_code = 200
+ response = {'StatusCode': 0}
+ return status_code, response
+
+
+def get_fake_logs():
+ status_code = 200
+ response = (b'\x01\x00\x00\x00\x00\x00\x00\x00'
+ b'\x02\x00\x00\x00\x00\x00\x00\x00'
+ b'\x01\x00\x00\x00\x00\x00\x00\x11Flowering Nights\n'
+ b'\x01\x00\x00\x00\x00\x00\x00\x10(Sakuya Iyazoi)\n')
+ return status_code, response
+
+
+def get_fake_diff():
+ status_code = 200
+ response = [{'Path': '/test', 'Kind': 1}]
+ return status_code, response
+
+
+def get_fake_events():
+ status_code = 200
+ response = [{'status': 'stop', 'id': FAKE_CONTAINER_ID,
+ 'from': FAKE_IMAGE_ID, 'time': 1423247867}]
+ return status_code, response
+
+
+def get_fake_export():
+ status_code = 200
+ response = 'Byte Stream....'
+ return status_code, response
+
+
+def post_fake_exec_create():
+ status_code = 200
+ response = {'Id': FAKE_EXEC_ID}
+ return status_code, response
+
+
+def post_fake_exec_start():
+ status_code = 200
+ response = (b'\x01\x00\x00\x00\x00\x00\x00\x11bin\nboot\ndev\netc\n'
+ b'\x01\x00\x00\x00\x00\x00\x00\x12lib\nmnt\nproc\nroot\n'
+ b'\x01\x00\x00\x00\x00\x00\x00\x0csbin\nusr\nvar\n')
+ return status_code, response
+
+
+def post_fake_exec_resize():
+ status_code = 201
+ return status_code, ''
+
+
+def get_fake_exec_inspect():
+ return 200, {
+ 'OpenStderr': True,
+ 'OpenStdout': True,
+ 'Container': get_fake_inspect_container()[1],
+ 'Running': False,
+ 'ProcessConfig': {
+ 'arguments': ['hello world'],
+ 'tty': False,
+ 'entrypoint': 'echo',
+ 'privileged': False,
+ 'user': ''
+ },
+ 'ExitCode': 0,
+ 'ID': FAKE_EXEC_ID,
+ 'OpenStdin': False
+ }
+
+
+def post_fake_stop_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_kill_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_pause_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_unpause_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_restart_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_rename_container():
+ status_code = 204
+ return status_code, None
+
+
+def delete_fake_remove_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_image_create():
+ status_code = 200
+ response = {'Id': FAKE_IMAGE_ID}
+ return status_code, response
+
+
+def delete_fake_remove_image():
+ status_code = 200
+ response = {'Id': FAKE_IMAGE_ID}
+ return status_code, response
+
+
+def get_fake_get_image():
+ status_code = 200
+ response = 'Byte Stream....'
+ return status_code, response
+
+
+def post_fake_load_image():
+ status_code = 200
+ response = {'Id': FAKE_IMAGE_ID}
+ return status_code, response
+
+
+def post_fake_commit():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_push():
+ status_code = 200
+ response = {'Id': FAKE_IMAGE_ID}
+ return status_code, response
+
+
+def post_fake_build_container():
+ status_code = 200
+ response = {'Id': FAKE_CONTAINER_ID}
+ return status_code, response
+
+
+def post_fake_tag_image():
+ status_code = 200
+ response = {'Id': FAKE_IMAGE_ID}
+ return status_code, response
+
+
+def get_fake_stats():
+ status_code = 200
+ response = fake_stat.OBJ
+ return status_code, response
+
+
+def get_fake_top():
+ return 200, {
+ 'Processes': [
+ [
+ 'root',
+ '26501',
+ '6907',
+ '0',
+ '10:32',
+ 'pts/55',
+ '00:00:00',
+ 'sleep 60',
+ ],
+ ],
+ 'Titles': [
+ 'UID',
+ 'PID',
+ 'PPID',
+ 'C',
+ 'STIME',
+ 'TTY',
+ 'TIME',
+ 'CMD',
+ ],
+ }
+
+
+def get_fake_volume_list():
+ status_code = 200
+ response = {
+ 'Volumes': [
+ {
+ 'Name': 'perfectcherryblossom',
+ 'Driver': 'local',
+ 'Mountpoint': '/var/lib/docker/volumes/perfectcherryblossom',
+ 'Scope': 'local'
+ }, {
+ 'Name': 'subterraneananimism',
+ 'Driver': 'local',
+ 'Mountpoint': '/var/lib/docker/volumes/subterraneananimism',
+ 'Scope': 'local'
+ }
+ ]
+ }
+ return status_code, response
+
+
+def get_fake_volume():
+ status_code = 200
+ response = {
+ 'Name': 'perfectcherryblossom',
+ 'Driver': 'local',
+ 'Mountpoint': '/var/lib/docker/volumes/perfectcherryblossom',
+ 'Labels': {
+ 'com.example.some-label': 'some-value'
+ },
+ 'Scope': 'local'
+ }
+ return status_code, response
+
+
+def fake_remove_volume():
+ return 204, None
+
+
+def post_fake_update_container():
+ return 200, {'Warnings': []}
+
+
+def post_fake_update_node():
+ return 200, None
+
+
+def post_fake_join_swarm():
+ return 200, None
+
+
+def get_fake_network_list():
+ return 200, [{
+ "Name": "bridge",
+ "Id": FAKE_NETWORK_ID,
+ "Scope": "local",
+ "Driver": "bridge",
+ "EnableIPv6": False,
+ "Internal": False,
+ "IPAM": {
+ "Driver": "default",
+ "Config": [
+ {
+ "Subnet": "172.17.0.0/16"
+ }
+ ]
+ },
+ "Containers": {
+ FAKE_CONTAINER_ID: {
+ "EndpointID": "ed2419a97c1d99",
+ "MacAddress": "02:42:ac:11:00:02",
+ "IPv4Address": "172.17.0.2/16",
+ "IPv6Address": ""
+ }
+ },
+ "Options": {
+ "com.docker.network.bridge.default_bridge": "true",
+ "com.docker.network.bridge.enable_icc": "true",
+ "com.docker.network.bridge.enable_ip_masquerade": "true",
+ "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
+ "com.docker.network.bridge.name": "docker0",
+ "com.docker.network.driver.mtu": "1500"
+ }
+ }]
+
+
+def get_fake_network():
+ return 200, get_fake_network_list()[1][0]
+
+
+def post_fake_network():
+ return 201, {"Id": FAKE_NETWORK_ID, "Warnings": []}
+
+
+def delete_fake_network():
+ return 204, None
+
+
+def post_fake_network_connect():
+ return 200, None
+
+
+def post_fake_network_disconnect():
+ return 200, None
+
+
+def post_fake_secret():
+ status_code = 200
+ response = {'ID': FAKE_SECRET_ID}
+ return status_code, response
+
+
+# Maps real api url to fake response callback
+prefix = 'http+docker://localhost'
+if constants.IS_WINDOWS_PLATFORM:
+ prefix = 'http+docker://localnpipe'
+
+fake_responses = {
+ '{prefix}/version'.format(prefix=prefix):
+ get_fake_version,
+ '{prefix}/{CURRENT_VERSION}/version'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_version,
+ '{prefix}/{CURRENT_VERSION}/info'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_info,
+ '{prefix}/{CURRENT_VERSION}/auth'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_auth,
+ '{prefix}/{CURRENT_VERSION}/_ping'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_ping,
+ '{prefix}/{CURRENT_VERSION}/images/search'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_search,
+ '{prefix}/{CURRENT_VERSION}/images/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_images,
+ '{prefix}/{CURRENT_VERSION}/images/test_image/history'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_image_history,
+ '{prefix}/{CURRENT_VERSION}/images/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_import_image,
+ '{prefix}/{CURRENT_VERSION}/containers/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_containers,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/start'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_start_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/resize'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_resize_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_inspect_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/rename'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_rename_container,
+ '{prefix}/{CURRENT_VERSION}/images/e9aa60c60128/tag'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_tag_image,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/wait'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_wait,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/logs'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_logs,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/changes'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_diff,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/export'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_export,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/update'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_update_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/exec'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_exec_create,
+ '{prefix}/{CURRENT_VERSION}/exec/d5d177f121dc/start'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_exec_start,
+ '{prefix}/{CURRENT_VERSION}/exec/d5d177f121dc/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_exec_inspect,
+ '{prefix}/{CURRENT_VERSION}/exec/d5d177f121dc/resize'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_exec_resize,
+
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/stats'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_stats,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/top'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_top,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/stop'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_stop_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/kill'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_kill_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/pause'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_pause_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/unpause'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_unpause_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b/restart'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_restart_container,
+ '{prefix}/{CURRENT_VERSION}/containers/3cc2351ab11b'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ delete_fake_remove_container,
+ '{prefix}/{CURRENT_VERSION}/images/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_image_create,
+ '{prefix}/{CURRENT_VERSION}/images/e9aa60c60128'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ delete_fake_remove_image,
+ '{prefix}/{CURRENT_VERSION}/images/e9aa60c60128/get'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_get_image,
+ '{prefix}/{CURRENT_VERSION}/images/load'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_load_image,
+ '{prefix}/{CURRENT_VERSION}/images/test_image/json'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_inspect_image,
+ '{prefix}/{CURRENT_VERSION}/images/test_image/insert'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_insert_image,
+ '{prefix}/{CURRENT_VERSION}/images/test_image/push'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_push,
+ '{prefix}/{CURRENT_VERSION}/commit'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_commit,
+ '{prefix}/{CURRENT_VERSION}/containers/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_create_container,
+ '{prefix}/{CURRENT_VERSION}/build'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_build_container,
+ '{prefix}/{CURRENT_VERSION}/events'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ get_fake_events,
+ ('{prefix}/{CURRENT_VERSION}/volumes'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'GET'):
+ get_fake_volume_list,
+ ('{prefix}/{CURRENT_VERSION}/volumes/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'POST'):
+ get_fake_volume,
+ ('{1}/{0}/volumes/{2}'.format(
+ CURRENT_VERSION, prefix, FAKE_VOLUME_NAME
+ ), 'GET'):
+ get_fake_volume,
+ ('{1}/{0}/volumes/{2}'.format(
+ CURRENT_VERSION, prefix, FAKE_VOLUME_NAME
+ ), 'DELETE'):
+ fake_remove_volume,
+ ('{1}/{0}/nodes/{2}/update?version=1'.format(
+ CURRENT_VERSION, prefix, FAKE_NODE_ID
+ ), 'POST'):
+ post_fake_update_node,
+ ('{prefix}/{CURRENT_VERSION}/swarm/join'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'POST'):
+ post_fake_join_swarm,
+ ('{prefix}/{CURRENT_VERSION}/networks'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'GET'):
+ get_fake_network_list,
+ ('{prefix}/{CURRENT_VERSION}/networks/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION), 'POST'):
+ post_fake_network,
+ ('{1}/{0}/networks/{2}'.format(
+ CURRENT_VERSION, prefix, FAKE_NETWORK_ID
+ ), 'GET'):
+ get_fake_network,
+ ('{1}/{0}/networks/{2}'.format(
+ CURRENT_VERSION, prefix, FAKE_NETWORK_ID
+ ), 'DELETE'):
+ delete_fake_network,
+ ('{1}/{0}/networks/{2}/connect'.format(
+ CURRENT_VERSION, prefix, FAKE_NETWORK_ID
+ ), 'POST'):
+ post_fake_network_connect,
+ ('{1}/{0}/networks/{2}/disconnect'.format(
+ CURRENT_VERSION, prefix, FAKE_NETWORK_ID
+ ), 'POST'):
+ post_fake_network_disconnect,
+ '{prefix}/{CURRENT_VERSION}/secrets/create'.format(prefix=prefix, CURRENT_VERSION=CURRENT_VERSION):
+ post_fake_secret,
+}
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py
new file mode 100644
index 00000000..97547328
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/fake_stat.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+OBJ = {
+ "read": "2015-02-11T19:20:46.667237763+02:00",
+ "network": {
+ "rx_bytes": 567224,
+ "rx_packets": 3773,
+ "rx_errors": 0,
+ "rx_dropped": 0,
+ "tx_bytes": 1176,
+ "tx_packets": 13,
+ "tx_errors": 0,
+ "tx_dropped": 0
+ },
+ "cpu_stats": {
+ "cpu_usage": {
+ "total_usage": 157260874053,
+ "percpu_usage": [
+ 52196306950,
+ 24118413549,
+ 53292684398,
+ 27653469156
+ ],
+ "usage_in_kernelmode": 37140000000,
+ "usage_in_usermode": 62140000000
+ },
+ "system_cpu_usage": 3.0881377e+14,
+ "throttling_data": {
+ "periods": 0,
+ "throttled_periods": 0,
+ "throttled_time": 0
+ }
+ },
+ "memory_stats": {
+ "usage": 179314688,
+ "max_usage": 258166784,
+ "stats": {
+ "active_anon": 90804224,
+ "active_file": 2195456,
+ "cache": 3096576,
+ "hierarchical_memory_limit": 1.844674407371e+19,
+ "inactive_anon": 85516288,
+ "inactive_file": 798720,
+ "mapped_file": 2646016,
+ "pgfault": 101034,
+ "pgmajfault": 1207,
+ "pgpgin": 115814,
+ "pgpgout": 75613,
+ "rss": 176218112,
+ "rss_huge": 12582912,
+ "total_active_anon": 90804224,
+ "total_active_file": 2195456,
+ "total_cache": 3096576,
+ "total_inactive_anon": 85516288,
+ "total_inactive_file": 798720,
+ "total_mapped_file": 2646016,
+ "total_pgfault": 101034,
+ "total_pgmajfault": 1207,
+ "total_pgpgin": 115814,
+ "total_pgpgout": 75613,
+ "total_rss": 176218112,
+ "total_rss_huge": 12582912,
+ "total_unevictable": 0,
+ "total_writeback": 0,
+ "unevictable": 0,
+ "writeback": 0
+ },
+ "failcnt": 0,
+ "limit": 8039038976
+ },
+ "blkio_stats": {
+ "io_service_bytes_recursive": [
+ {
+ "major": 8,
+ "minor": 0,
+ "op": "Read",
+ "value": 72843264
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Write",
+ "value": 4096
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Sync",
+ "value": 4096
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Async",
+ "value": 72843264
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Total",
+ "value": 72847360
+ }
+ ],
+ "io_serviced_recursive": [
+ {
+ "major": 8,
+ "minor": 0,
+ "op": "Read",
+ "value": 10581
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Write",
+ "value": 1
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Sync",
+ "value": 1
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Async",
+ "value": 10581
+ }, {
+ "major": 8,
+ "minor": 0,
+ "op": "Total",
+ "value": 10582
+ }
+ ],
+ "io_queue_recursive": [],
+ "io_service_time_recursive": [],
+ "io_wait_time_recursive": [],
+ "io_merged_recursive": [],
+ "io_time_recursive": [],
+ "sectors_recursive": []
+ }
+}
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py
new file mode 100644
index 00000000..b3b5a351
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_auth.py
@@ -0,0 +1,819 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import base64
+import json
+import os
+import os.path
+import random
+import shutil
+import tempfile
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api import auth, errors
+from ansible_collections.community.docker.plugins.module_utils._api.credentials.errors import CredentialsNotFound
+from ansible_collections.community.docker.plugins.module_utils._api.credentials.store import Store
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class RegressionTest(unittest.TestCase):
+ def test_803_urlsafe_encode(self):
+ auth_data = {
+ 'username': 'root',
+ 'password': 'GR?XGR?XGR?XGR?X'
+ }
+ encoded = auth.encode_header(auth_data)
+ assert b'/' not in encoded
+ assert b'_' in encoded
+
+
+class ResolveRepositoryNameTest(unittest.TestCase):
+ def test_resolve_repository_name_hub_library_image(self):
+ assert auth.resolve_repository_name('image') == (
+ 'docker.io', 'image'
+ )
+
+ def test_resolve_repository_name_dotted_hub_library_image(self):
+ assert auth.resolve_repository_name('image.valid') == (
+ 'docker.io', 'image.valid'
+ )
+
+ def test_resolve_repository_name_hub_image(self):
+ assert auth.resolve_repository_name('username/image') == (
+ 'docker.io', 'username/image'
+ )
+
+ def test_explicit_hub_index_library_image(self):
+ assert auth.resolve_repository_name('docker.io/image') == (
+ 'docker.io', 'image'
+ )
+
+ def test_explicit_legacy_hub_index_library_image(self):
+ assert auth.resolve_repository_name('index.docker.io/image') == (
+ 'docker.io', 'image'
+ )
+
+ def test_resolve_repository_name_private_registry(self):
+ assert auth.resolve_repository_name('my.registry.net/image') == (
+ 'my.registry.net', 'image'
+ )
+
+ def test_resolve_repository_name_private_registry_with_port(self):
+ assert auth.resolve_repository_name('my.registry.net:5000/image') == (
+ 'my.registry.net:5000', 'image'
+ )
+
+ def test_resolve_repository_name_private_registry_with_username(self):
+ assert auth.resolve_repository_name(
+ 'my.registry.net/username/image'
+ ) == ('my.registry.net', 'username/image')
+
+ def test_resolve_repository_name_no_dots_but_port(self):
+ assert auth.resolve_repository_name('hostname:5000/image') == (
+ 'hostname:5000', 'image'
+ )
+
+ def test_resolve_repository_name_no_dots_but_port_and_username(self):
+ assert auth.resolve_repository_name(
+ 'hostname:5000/username/image'
+ ) == ('hostname:5000', 'username/image')
+
+ def test_resolve_repository_name_localhost(self):
+ assert auth.resolve_repository_name('localhost/image') == (
+ 'localhost', 'image'
+ )
+
+ def test_resolve_repository_name_localhost_with_username(self):
+ assert auth.resolve_repository_name('localhost/username/image') == (
+ 'localhost', 'username/image'
+ )
+
+ def test_invalid_index_name(self):
+ with pytest.raises(errors.InvalidRepository):
+ auth.resolve_repository_name('-gecko.com/image')
+
+
+def encode_auth(auth_info):
+ return base64.b64encode(
+ auth_info.get('username', '').encode('utf-8') + b':' +
+ auth_info.get('password', '').encode('utf-8'))
+
+
+class ResolveAuthTest(unittest.TestCase):
+ index_config = {'auth': encode_auth({'username': 'indexuser'})}
+ private_config = {'auth': encode_auth({'username': 'privateuser'})}
+ legacy_config = {'auth': encode_auth({'username': 'legacyauth'})}
+
+ auth_config = auth.AuthConfig({
+ 'auths': auth.parse_auth({
+ 'https://index.docker.io/v1/': index_config,
+ 'my.registry.net': private_config,
+ 'http://legacy.registry.url/v1/': legacy_config,
+ })
+ })
+
+ def test_resolve_authconfig_hostname_only(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'my.registry.net'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_no_protocol(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'my.registry.net/v1/'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_no_path(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'http://my.registry.net'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_no_path_trailing_slash(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'http://my.registry.net/'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_no_path_wrong_secure_proto(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'https://my.registry.net'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_no_path_wrong_insecure_proto(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'http://index.docker.io'
+ )['username'] == 'indexuser'
+
+ def test_resolve_authconfig_path_wrong_proto(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'https://my.registry.net/v1/'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_default_registry(self):
+ assert auth.resolve_authconfig(
+ self.auth_config
+ )['username'] == 'indexuser'
+
+ def test_resolve_authconfig_default_explicit_none(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, None
+ )['username'] == 'indexuser'
+
+ def test_resolve_authconfig_fully_explicit(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'http://my.registry.net/v1/'
+ )['username'] == 'privateuser'
+
+ def test_resolve_authconfig_legacy_config(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'legacy.registry.url'
+ )['username'] == 'legacyauth'
+
+ def test_resolve_authconfig_no_match(self):
+ assert auth.resolve_authconfig(
+ self.auth_config, 'does.not.exist'
+ ) is None
+
+ def test_resolve_registry_and_auth_library_image(self):
+ image = 'image'
+ assert auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ )['username'] == 'indexuser'
+
+ def test_resolve_registry_and_auth_hub_image(self):
+ image = 'username/image'
+ assert auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ )['username'] == 'indexuser'
+
+ def test_resolve_registry_and_auth_explicit_hub(self):
+ image = 'docker.io/username/image'
+ assert auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ )['username'] == 'indexuser'
+
+ def test_resolve_registry_and_auth_explicit_legacy_hub(self):
+ image = 'index.docker.io/username/image'
+ assert auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ )['username'] == 'indexuser'
+
+ def test_resolve_registry_and_auth_private_registry(self):
+ image = 'my.registry.net/image'
+ assert auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ )['username'] == 'privateuser'
+
+ def test_resolve_registry_and_auth_unauthenticated_registry(self):
+ image = 'other.registry.net/image'
+ assert auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ ) is None
+
+ def test_resolve_auth_with_empty_credstore_and_auth_dict(self):
+ auth_config = auth.AuthConfig({
+ 'auths': auth.parse_auth({
+ 'https://index.docker.io/v1/': self.index_config,
+ }),
+ 'credsStore': 'blackbox'
+ })
+ with mock.patch(
+ 'ansible_collections.community.docker.plugins.module_utils._api.auth.AuthConfig._resolve_authconfig_credstore'
+ ) as m:
+ m.return_value = None
+ assert 'indexuser' == auth.resolve_authconfig(
+ auth_config, None
+ )['username']
+
+
+class LoadConfigTest(unittest.TestCase):
+ def test_load_config_no_file(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg = auth.load_config(folder)
+ assert cfg is not None
+
+ def test_load_legacy_config(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ with open(cfg_path, 'w') as f:
+ f.write('auth = {auth}\n'.format(auth=auth_))
+ f.write('email = sakuya@scarlet.net')
+
+ cfg = auth.load_config(cfg_path)
+ assert auth.resolve_authconfig(cfg) is not None
+ assert cfg.auths[auth.INDEX_NAME] is not None
+ cfg = cfg.auths[auth.INDEX_NAME]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == 'sakuya@scarlet.net'
+ assert cfg.get('Auth') is None
+
+ def test_load_json_config(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ email = 'sakuya@scarlet.net'
+ with open(cfg_path, 'w') as f:
+ json.dump(
+ {auth.INDEX_URL: {'auth': auth_, 'email': email}}, f
+ )
+ cfg = auth.load_config(cfg_path)
+ assert auth.resolve_authconfig(cfg) is not None
+ assert cfg.auths[auth.INDEX_URL] is not None
+ cfg = cfg.auths[auth.INDEX_URL]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == email
+ assert cfg.get('Auth') is None
+
+ def test_load_modern_json_config(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg_path = os.path.join(folder, 'config.json')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ email = 'sakuya@scarlet.net'
+ with open(cfg_path, 'w') as f:
+ json.dump({
+ 'auths': {
+ auth.INDEX_URL: {
+ 'auth': auth_, 'email': email
+ }
+ }
+ }, f)
+ cfg = auth.load_config(cfg_path)
+ assert auth.resolve_authconfig(cfg) is not None
+ assert cfg.auths[auth.INDEX_URL] is not None
+ cfg = cfg.auths[auth.INDEX_URL]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == email
+
+ def test_load_config_with_random_name(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder,
+ '.{0}.dockercfg'.format(
+ random.randrange(100000)))
+ registry = 'https://your.private.registry.io'
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ config = {
+ registry: {
+ 'auth': '{auth}'.format(auth=auth_),
+ 'email': 'sakuya@scarlet.net'
+ }
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ cfg = auth.load_config(dockercfg_path).auths
+ assert registry in cfg
+ assert cfg[registry] is not None
+ cfg = cfg[registry]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == 'sakuya@scarlet.net'
+ assert cfg.get('auth') is None
+
+ def test_load_config_custom_config_env(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder, 'config.json')
+ registry = 'https://your.private.registry.io'
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ config = {
+ registry: {
+ 'auth': '{auth}'.format(auth=auth_),
+ 'email': 'sakuya@scarlet.net'
+ }
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
+ cfg = auth.load_config(None).auths
+ assert registry in cfg
+ assert cfg[registry] is not None
+ cfg = cfg[registry]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == 'sakuya@scarlet.net'
+ assert cfg.get('auth') is None
+
+ def test_load_config_custom_config_env_with_auths(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder, 'config.json')
+ registry = 'https://your.private.registry.io'
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ config = {
+ 'auths': {
+ registry: {
+ 'auth': '{auth}'.format(auth=auth_),
+ 'email': 'sakuya@scarlet.net'
+ }
+ }
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
+ cfg = auth.load_config(None)
+ assert registry in cfg.auths
+ cfg = cfg.auths[registry]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == 'sakuya@scarlet.net'
+ assert cfg.get('auth') is None
+
+ def test_load_config_custom_config_env_utf8(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder, 'config.json')
+ registry = 'https://your.private.registry.io'
+ auth_ = base64.b64encode(
+ b'sakuya\xc3\xa6:izayoi\xc3\xa6').decode('ascii')
+ config = {
+ 'auths': {
+ registry: {
+ 'auth': '{auth}'.format(auth=auth_),
+ 'email': 'sakuya@scarlet.net'
+ }
+ }
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
+ cfg = auth.load_config(None)
+ assert registry in cfg.auths
+ cfg = cfg.auths[registry]
+ assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8')
+ assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8')
+ assert cfg['email'] == 'sakuya@scarlet.net'
+ assert cfg.get('auth') is None
+
+ def test_load_config_unknown_keys(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ dockercfg_path = os.path.join(folder, 'config.json')
+ config = {
+ 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i'
+ }
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ cfg = auth.load_config(dockercfg_path)
+ assert dict(cfg) == {'auths': {}}
+
+ def test_load_config_invalid_auth_dict(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ dockercfg_path = os.path.join(folder, 'config.json')
+ config = {
+ 'auths': {
+ 'scarlet.net': {'sakuya': 'izayoi'}
+ }
+ }
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ cfg = auth.load_config(dockercfg_path)
+ assert dict(cfg) == {'auths': {'scarlet.net': {}}}
+
+ def test_load_config_identity_token(self):
+ folder = tempfile.mkdtemp()
+ registry = 'scarlet.net'
+ token = '1ce1cebb-503e-7043-11aa-7feb8bd4a1ce'
+ self.addCleanup(shutil.rmtree, folder)
+ dockercfg_path = os.path.join(folder, 'config.json')
+ auth_entry = encode_auth({'username': 'sakuya'}).decode('ascii')
+ config = {
+ 'auths': {
+ registry: {
+ 'auth': auth_entry,
+ 'identitytoken': token
+ }
+ }
+ }
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ cfg = auth.load_config(dockercfg_path)
+ assert registry in cfg.auths
+ cfg = cfg.auths[registry]
+ assert 'IdentityToken' in cfg
+ assert cfg['IdentityToken'] == token
+
+
+class CredstoreTest(unittest.TestCase):
+ def setUp(self):
+ self.authconfig = auth.AuthConfig({'credsStore': 'default'})
+ self.default_store = InMemoryStore('default')
+ self.authconfig._stores['default'] = self.default_store
+ self.default_store.store(
+ 'https://gensokyo.jp/v2', 'sakuya', 'izayoi',
+ )
+ self.default_store.store(
+ 'https://default.com/v2', 'user', 'hunter2',
+ )
+
+ def test_get_credential_store(self):
+ auth_config = auth.AuthConfig({
+ 'credHelpers': {
+ 'registry1.io': 'truesecret',
+ 'registry2.io': 'powerlock'
+ },
+ 'credsStore': 'blackbox',
+ })
+
+ assert auth_config.get_credential_store('registry1.io') == 'truesecret'
+ assert auth_config.get_credential_store('registry2.io') == 'powerlock'
+ assert auth_config.get_credential_store('registry3.io') == 'blackbox'
+
+ def test_get_credential_store_no_default(self):
+ auth_config = auth.AuthConfig({
+ 'credHelpers': {
+ 'registry1.io': 'truesecret',
+ 'registry2.io': 'powerlock'
+ },
+ })
+ assert auth_config.get_credential_store('registry2.io') == 'powerlock'
+ assert auth_config.get_credential_store('registry3.io') is None
+
+ def test_get_credential_store_default_index(self):
+ auth_config = auth.AuthConfig({
+ 'credHelpers': {
+ 'https://index.docker.io/v1/': 'powerlock'
+ },
+ 'credsStore': 'truesecret'
+ })
+
+ assert auth_config.get_credential_store(None) == 'powerlock'
+ assert auth_config.get_credential_store('docker.io') == 'powerlock'
+ assert auth_config.get_credential_store('images.io') == 'truesecret'
+
+ def test_get_credential_store_with_plain_dict(self):
+ auth_config = {
+ 'credHelpers': {
+ 'registry1.io': 'truesecret',
+ 'registry2.io': 'powerlock'
+ },
+ 'credsStore': 'blackbox',
+ }
+
+ assert auth.get_credential_store(
+ auth_config, 'registry1.io'
+ ) == 'truesecret'
+ assert auth.get_credential_store(
+ auth_config, 'registry2.io'
+ ) == 'powerlock'
+ assert auth.get_credential_store(
+ auth_config, 'registry3.io'
+ ) == 'blackbox'
+
+ def test_get_all_credentials_credstore_only(self):
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ }
+
+ def test_get_all_credentials_with_empty_credhelper(self):
+ self.authconfig['credHelpers'] = {
+ 'registry1.io': 'truesecret',
+ }
+ self.authconfig._stores['truesecret'] = InMemoryStore()
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'registry1.io': None,
+ }
+
+ def test_get_all_credentials_with_credhelpers_only(self):
+ del self.authconfig['credsStore']
+ assert self.authconfig.get_all_credentials() == {}
+
+ self.authconfig['credHelpers'] = {
+ 'https://gensokyo.jp/v2': 'default',
+ 'https://default.com/v2': 'default',
+ }
+
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ }
+
+ def test_get_all_credentials_with_auths_entries(self):
+ self.authconfig.add_auth('registry1.io', {
+ 'ServerAddress': 'registry1.io',
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ })
+
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'registry1.io': {
+ 'ServerAddress': 'registry1.io',
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ },
+ }
+
+ def test_get_all_credentials_with_empty_auths_entry(self):
+ self.authconfig.add_auth('default.com', {})
+
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ }
+
+ def test_get_all_credentials_credstore_overrides_auth_entry(self):
+ self.authconfig.add_auth('default.com', {
+ 'Username': 'shouldnotsee',
+ 'Password': 'thisentry',
+ 'ServerAddress': 'https://default.com/v2',
+ })
+
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ }
+
+ def test_get_all_credentials_helpers_override_default(self):
+ self.authconfig['credHelpers'] = {
+ 'https://default.com/v2': 'truesecret',
+ }
+ truesecret = InMemoryStore('truesecret')
+ truesecret.store('https://default.com/v2', 'reimu', 'hakurei')
+ self.authconfig._stores['truesecret'] = truesecret
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ }
+
+ def test_get_all_credentials_3_sources(self):
+ self.authconfig['credHelpers'] = {
+ 'registry1.io': 'truesecret',
+ }
+ truesecret = InMemoryStore('truesecret')
+ truesecret.store('registry1.io', 'reimu', 'hakurei')
+ self.authconfig._stores['truesecret'] = truesecret
+ self.authconfig.add_auth('registry2.io', {
+ 'ServerAddress': 'registry2.io',
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ })
+
+ assert self.authconfig.get_all_credentials() == {
+ 'https://gensokyo.jp/v2': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'gensokyo.jp': {
+ 'Username': 'sakuya',
+ 'Password': 'izayoi',
+ 'ServerAddress': 'https://gensokyo.jp/v2',
+ },
+ 'https://default.com/v2': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'default.com': {
+ 'Username': 'user',
+ 'Password': 'hunter2',
+ 'ServerAddress': 'https://default.com/v2',
+ },
+ 'registry1.io': {
+ 'ServerAddress': 'registry1.io',
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ },
+ 'registry2.io': {
+ 'ServerAddress': 'registry2.io',
+ 'Username': 'reimu',
+ 'Password': 'hakurei',
+ }
+ }
+
+
+class InMemoryStore(Store):
+ def __init__(self, *args, **kwargs):
+ self.__store = {}
+
+ def get(self, server):
+ try:
+ return self.__store[server]
+ except KeyError:
+ raise CredentialsNotFound()
+
+ def store(self, server, username, secret):
+ self.__store[server] = {
+ 'ServerURL': server,
+ 'Username': username,
+ 'Secret': secret,
+ }
+
+ def list(self):
+ return dict(
+ (k, v['Username']) for k, v in self.__store.items()
+ )
+
+ def erase(self, server):
+ del self.__store[server]
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py
new file mode 100644
index 00000000..2cc114ed
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/test_errors.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+import sys
+
+import pytest
+import requests
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.errors import (
+ APIError, DockerException,
+ create_unexpected_kwargs_error,
+ create_api_error_from_http_exception,
+)
+
+
+class APIErrorTest(unittest.TestCase):
+ def test_api_error_is_caught_by_dockerexception(self):
+ try:
+ raise APIError("this should be caught by DockerException")
+ except DockerException:
+ pass
+
+ def test_status_code_200(self):
+ """The status_code property is present with 200 response."""
+ resp = requests.Response()
+ resp.status_code = 200
+ err = APIError('', response=resp)
+ assert err.status_code == 200
+
+ def test_status_code_400(self):
+ """The status_code property is present with 400 response."""
+ resp = requests.Response()
+ resp.status_code = 400
+ err = APIError('', response=resp)
+ assert err.status_code == 400
+
+ def test_status_code_500(self):
+ """The status_code property is present with 500 response."""
+ resp = requests.Response()
+ resp.status_code = 500
+ err = APIError('', response=resp)
+ assert err.status_code == 500
+
+ def test_is_server_error_200(self):
+ """Report not server error on 200 response."""
+ resp = requests.Response()
+ resp.status_code = 200
+ err = APIError('', response=resp)
+ assert err.is_server_error() is False
+
+ def test_is_server_error_300(self):
+ """Report not server error on 300 response."""
+ resp = requests.Response()
+ resp.status_code = 300
+ err = APIError('', response=resp)
+ assert err.is_server_error() is False
+
+ def test_is_server_error_400(self):
+ """Report not server error on 400 response."""
+ resp = requests.Response()
+ resp.status_code = 400
+ err = APIError('', response=resp)
+ assert err.is_server_error() is False
+
+ def test_is_server_error_500(self):
+ """Report server error on 500 response."""
+ resp = requests.Response()
+ resp.status_code = 500
+ err = APIError('', response=resp)
+ assert err.is_server_error() is True
+
+ def test_is_client_error_500(self):
+ """Report not client error on 500 response."""
+ resp = requests.Response()
+ resp.status_code = 500
+ err = APIError('', response=resp)
+ assert err.is_client_error() is False
+
+ def test_is_client_error_400(self):
+ """Report client error on 400 response."""
+ resp = requests.Response()
+ resp.status_code = 400
+ err = APIError('', response=resp)
+ assert err.is_client_error() is True
+
+ def test_is_error_300(self):
+ """Report no error on 300 response."""
+ resp = requests.Response()
+ resp.status_code = 300
+ err = APIError('', response=resp)
+ assert err.is_error() is False
+
+ def test_is_error_400(self):
+ """Report error on 400 response."""
+ resp = requests.Response()
+ resp.status_code = 400
+ err = APIError('', response=resp)
+ assert err.is_error() is True
+
+ def test_is_error_500(self):
+ """Report error on 500 response."""
+ resp = requests.Response()
+ resp.status_code = 500
+ err = APIError('', response=resp)
+ assert err.is_error() is True
+
+ def test_create_error_from_exception(self):
+ resp = requests.Response()
+ resp.status_code = 500
+ err = APIError('')
+ try:
+ resp.raise_for_status()
+ except requests.exceptions.HTTPError as e:
+ try:
+ create_api_error_from_http_exception(e)
+ except APIError as e:
+ err = e
+ assert err.is_server_error() is True
+
+
+class CreateUnexpectedKwargsErrorTest(unittest.TestCase):
+ def test_create_unexpected_kwargs_error_single(self):
+ e = create_unexpected_kwargs_error('f', {'foo': 'bar'})
+ assert str(e) == "f() got an unexpected keyword argument 'foo'"
+
+ def test_create_unexpected_kwargs_error_multiple(self):
+ e = create_unexpected_kwargs_error('f', {'foo': 'bar', 'baz': 'bosh'})
+ assert str(e) == "f() got unexpected keyword arguments 'baz', 'foo'"
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py
new file mode 100644
index 00000000..e9189f3e
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_sshconn.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.transport.sshconn import SSHSocket, SSHHTTPAdapter
+
+
+class SSHAdapterTest(unittest.TestCase):
+ @staticmethod
+ def test_ssh_hostname_prefix_trim():
+ conn = SSHHTTPAdapter(
+ base_url="ssh://user@hostname:1234", shell_out=True)
+ assert conn.ssh_host == "user@hostname:1234"
+
+ @staticmethod
+ def test_ssh_parse_url():
+ c = SSHSocket(host="user@hostname:1234")
+ assert c.host == "hostname"
+ assert c.port == "1234"
+ assert c.user == "user"
+
+ @staticmethod
+ def test_ssh_parse_hostname_only():
+ c = SSHSocket(host="hostname")
+ assert c.host == "hostname"
+ assert c.port is None
+ assert c.user is None
+
+ @staticmethod
+ def test_ssh_parse_user_and_hostname():
+ c = SSHSocket(host="user@hostname")
+ assert c.host == "hostname"
+ assert c.port is None
+ assert c.user == "user"
+
+ @staticmethod
+ def test_ssh_parse_hostname_and_port():
+ c = SSHSocket(host="hostname:22")
+ assert c.host == "hostname"
+ assert c.port == "22"
+ assert c.user is None
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py
new file mode 100644
index 00000000..428163e6
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/transport/test_ssladapter.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.transport import ssladapter
+
+try:
+ from backports.ssl_match_hostname import (
+ match_hostname, CertificateError
+ )
+except ImportError:
+ from ssl import (
+ match_hostname, CertificateError
+ )
+
+try:
+ from ssl import OP_NO_SSLv3, OP_NO_SSLv2, OP_NO_TLSv1
+except ImportError:
+ OP_NO_SSLv2 = 0x1000000
+ OP_NO_SSLv3 = 0x2000000
+ OP_NO_TLSv1 = 0x4000000
+
+
+class SSLAdapterTest(unittest.TestCase):
+ def test_only_uses_tls(self):
+ ssl_context = ssladapter.urllib3.util.ssl_.create_urllib3_context()
+
+ assert ssl_context.options & OP_NO_SSLv3
+ # if OpenSSL is compiled without SSL2 support, OP_NO_SSLv2 will be 0
+ assert not bool(OP_NO_SSLv2) or ssl_context.options & OP_NO_SSLv2
+ assert not ssl_context.options & OP_NO_TLSv1
+
+
+class MatchHostnameTest(unittest.TestCase):
+ cert = {
+ 'issuer': (
+ (('countryName', 'US'),),
+ (('stateOrProvinceName', 'California'),),
+ (('localityName', 'San Francisco'),),
+ (('organizationName', 'Docker Inc'),),
+ (('organizationalUnitName', 'Docker-Python'),),
+ (('commonName', 'localhost'),),
+ (('emailAddress', 'info@docker.com'),)
+ ),
+ 'notAfter': 'Mar 25 23:08:23 2030 GMT',
+ 'notBefore': 'Mar 25 23:08:23 2016 GMT',
+ 'serialNumber': 'BD5F894C839C548F',
+ 'subject': (
+ (('countryName', 'US'),),
+ (('stateOrProvinceName', 'California'),),
+ (('localityName', 'San Francisco'),),
+ (('organizationName', 'Docker Inc'),),
+ (('organizationalUnitName', 'Docker-Python'),),
+ (('commonName', 'localhost'),),
+ (('emailAddress', 'info@docker.com'),)
+ ),
+ 'subjectAltName': (
+ ('DNS', 'localhost'),
+ ('DNS', '*.gensokyo.jp'),
+ ('IP Address', '127.0.0.1'),
+ ),
+ 'version': 3
+ }
+
+ def test_match_ip_address_success(self):
+ assert match_hostname(self.cert, '127.0.0.1') is None
+
+ def test_match_localhost_success(self):
+ assert match_hostname(self.cert, 'localhost') is None
+
+ def test_match_dns_success(self):
+ assert match_hostname(self.cert, 'touhou.gensokyo.jp') is None
+
+ def test_match_ip_address_failure(self):
+ with pytest.raises(CertificateError):
+ match_hostname(self.cert, '192.168.0.25')
+
+ def test_match_dns_failure(self):
+ with pytest.raises(CertificateError):
+ match_hostname(self.cert, 'foobar.co.uk')
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py
new file mode 100644
index 00000000..50eb703d
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_build.py
@@ -0,0 +1,515 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import os.path
+import shutil
+import socket
+import tarfile
+import tempfile
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.constants import IS_WINDOWS_PLATFORM
+from ansible_collections.community.docker.plugins.module_utils._api.utils.build import exclude_paths, tar
+
+
+def make_tree(dirs, files):
+ base = tempfile.mkdtemp()
+
+ for path in dirs:
+ os.makedirs(os.path.join(base, path))
+
+ for path in files:
+ with open(os.path.join(base, path), 'w') as f:
+ f.write("content")
+
+ return base
+
+
+def convert_paths(collection):
+ return set(map(convert_path, collection))
+
+
+def convert_path(path):
+ return path.replace('/', os.path.sep)
+
+
+class ExcludePathsTest(unittest.TestCase):
+ dirs = [
+ 'foo',
+ 'foo/bar',
+ 'bar',
+ 'target',
+ 'target/subdir',
+ 'subdir',
+ 'subdir/target',
+ 'subdir/target/subdir',
+ 'subdir/subdir2',
+ 'subdir/subdir2/target',
+ 'subdir/subdir2/target/subdir'
+ ]
+
+ files = [
+ 'Dockerfile',
+ 'Dockerfile.alt',
+ '.dockerignore',
+ 'a.py',
+ 'a.go',
+ 'b.py',
+ 'cde.py',
+ 'foo/a.py',
+ 'foo/b.py',
+ 'foo/bar/a.py',
+ 'bar/a.py',
+ 'foo/Dockerfile3',
+ 'target/file.txt',
+ 'target/subdir/file.txt',
+ 'subdir/file.txt',
+ 'subdir/target/file.txt',
+ 'subdir/target/subdir/file.txt',
+ 'subdir/subdir2/file.txt',
+ 'subdir/subdir2/target/file.txt',
+ 'subdir/subdir2/target/subdir/file.txt',
+ ]
+
+ all_paths = set(dirs + files)
+
+ def setUp(self):
+ self.base = make_tree(self.dirs, self.files)
+
+ def tearDown(self):
+ shutil.rmtree(self.base)
+
+ def exclude(self, patterns, dockerfile=None):
+ return set(exclude_paths(self.base, patterns, dockerfile=dockerfile))
+
+ def test_no_excludes(self):
+ assert self.exclude(['']) == convert_paths(self.all_paths)
+
+ def test_no_dupes(self):
+ paths = exclude_paths(self.base, ['!a.py'])
+ assert sorted(paths) == sorted(set(paths))
+
+ def test_wildcard_exclude(self):
+ assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore'])
+
+ def test_exclude_dockerfile_dockerignore(self):
+ """
+ Even if the .dockerignore file explicitly says to exclude
+ Dockerfile and/or .dockerignore, don't exclude them from
+ the actual tar file.
+ """
+ assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths(
+ self.all_paths
+ )
+
+ def test_exclude_custom_dockerfile(self):
+ """
+ If we're using a custom Dockerfile, make sure that's not
+ excluded.
+ """
+ assert self.exclude(['*'], dockerfile='Dockerfile.alt') == set(['Dockerfile.alt', '.dockerignore'])
+
+ assert self.exclude(
+ ['*'], dockerfile='foo/Dockerfile3'
+ ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore']))
+
+ # https://github.com/docker/docker-py/issues/1956
+ assert self.exclude(
+ ['*'], dockerfile='./foo/Dockerfile3'
+ ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore']))
+
+ def test_exclude_dockerfile_child(self):
+ includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3')
+ assert convert_path('foo/Dockerfile3') in includes
+ assert convert_path('foo/a.py') not in includes
+
+ def test_single_filename(self):
+ assert self.exclude(['a.py']) == convert_paths(
+ self.all_paths - set(['a.py'])
+ )
+
+ def test_single_filename_leading_dot_slash(self):
+ assert self.exclude(['./a.py']) == convert_paths(
+ self.all_paths - set(['a.py'])
+ )
+
+ # As odd as it sounds, a filename pattern with a trailing slash on the
+ # end *will* result in that file being excluded.
+ def test_single_filename_trailing_slash(self):
+ assert self.exclude(['a.py/']) == convert_paths(
+ self.all_paths - set(['a.py'])
+ )
+
+ def test_wildcard_filename_start(self):
+ assert self.exclude(['*.py']) == convert_paths(
+ self.all_paths - set(['a.py', 'b.py', 'cde.py'])
+ )
+
+ def test_wildcard_with_exception(self):
+ assert self.exclude(['*.py', '!b.py']) == convert_paths(
+ self.all_paths - set(['a.py', 'cde.py'])
+ )
+
+ def test_wildcard_with_wildcard_exception(self):
+ assert self.exclude(['*.*', '!*.go']) == convert_paths(
+ self.all_paths - set([
+ 'a.py', 'b.py', 'cde.py', 'Dockerfile.alt',
+ ])
+ )
+
+ def test_wildcard_filename_end(self):
+ assert self.exclude(['a.*']) == convert_paths(
+ self.all_paths - set(['a.py', 'a.go'])
+ )
+
+ def test_question_mark(self):
+ assert self.exclude(['?.py']) == convert_paths(
+ self.all_paths - set(['a.py', 'b.py'])
+ )
+
+ def test_single_subdir_single_filename(self):
+ assert self.exclude(['foo/a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py'])
+ )
+
+ def test_single_subdir_single_filename_leading_slash(self):
+ assert self.exclude(['/foo/a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py'])
+ )
+
+ def test_exclude_include_absolute_path(self):
+ base = make_tree([], ['a.py', 'b.py'])
+ assert exclude_paths(
+ base,
+ ['/*', '!/*.py']
+ ) == set(['a.py', 'b.py'])
+
+ def test_single_subdir_with_path_traversal(self):
+ assert self.exclude(['foo/whoops/../a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py'])
+ )
+
+ def test_single_subdir_wildcard_filename(self):
+ assert self.exclude(['foo/*.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py', 'foo/b.py'])
+ )
+
+ def test_wildcard_subdir_single_filename(self):
+ assert self.exclude(['*/a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py', 'bar/a.py'])
+ )
+
+ def test_wildcard_subdir_wildcard_filename(self):
+ assert self.exclude(['*/*.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py'])
+ )
+
+ def test_directory(self):
+ assert self.exclude(['foo']) == convert_paths(
+ self.all_paths - set([
+ 'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py',
+ 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_trailing_slash(self):
+ assert self.exclude(['foo']) == convert_paths(
+ self.all_paths - set([
+ 'foo', 'foo/a.py', 'foo/b.py',
+ 'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_single_exception(self):
+ assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths(
+ self.all_paths - set([
+ 'foo/a.py', 'foo/b.py', 'foo', 'foo/bar',
+ 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_subdir_exception(self):
+ assert self.exclude(['foo', '!foo/bar']) == convert_paths(
+ self.all_paths - set([
+ 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
+ ])
+ )
+
+ @pytest.mark.skipif(
+ not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows'
+ )
+ def test_directory_with_subdir_exception_win32_pathsep(self):
+ assert self.exclude(['foo', '!foo\\bar']) == convert_paths(
+ self.all_paths - set([
+ 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_wildcard_exception(self):
+ assert self.exclude(['foo', '!foo/*.py']) == convert_paths(
+ self.all_paths - set([
+ 'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_subdirectory(self):
+ assert self.exclude(['foo/bar']) == convert_paths(
+ self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
+ )
+
+ @pytest.mark.skipif(
+ not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows'
+ )
+ def test_subdirectory_win32_pathsep(self):
+ assert self.exclude(['foo\\bar']) == convert_paths(
+ self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
+ )
+
+ def test_double_wildcard(self):
+ assert self.exclude(['**/a.py']) == convert_paths(
+ self.all_paths - set([
+ 'a.py', 'foo/a.py', 'foo/bar/a.py', 'bar/a.py'
+ ])
+ )
+
+ assert self.exclude(['foo/**/bar']) == convert_paths(
+ self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
+ )
+
+ def test_single_and_double_wildcard(self):
+ assert self.exclude(['**/target/*/*']) == convert_paths(
+ self.all_paths - set([
+ 'target/subdir/file.txt',
+ 'subdir/target/subdir/file.txt',
+ 'subdir/subdir2/target/subdir/file.txt',
+ ])
+ )
+
+ def test_trailing_double_wildcard(self):
+ assert self.exclude(['subdir/**']) == convert_paths(
+ self.all_paths - set([
+ 'subdir/file.txt',
+ 'subdir/target/file.txt',
+ 'subdir/target/subdir/file.txt',
+ 'subdir/subdir2/file.txt',
+ 'subdir/subdir2/target/file.txt',
+ 'subdir/subdir2/target/subdir/file.txt',
+ 'subdir/target',
+ 'subdir/target/subdir',
+ 'subdir/subdir2',
+ 'subdir/subdir2/target',
+ 'subdir/subdir2/target/subdir',
+ ])
+ )
+
+ def test_double_wildcard_with_exception(self):
+ assert self.exclude(['**', '!bar', '!foo/bar']) == convert_paths(
+ set([
+ 'foo/bar', 'foo/bar/a.py', 'bar', 'bar/a.py', 'Dockerfile',
+ '.dockerignore',
+ ])
+ )
+
+ def test_include_wildcard(self):
+ # This may be surprising but it matches the CLI's behavior
+ # (tested with 18.05.0-ce on linux)
+ base = make_tree(['a'], ['a/b.py'])
+ assert exclude_paths(
+ base,
+ ['*', '!*/b.py']
+ ) == set()
+
+ def test_last_line_precedence(self):
+ base = make_tree(
+ [],
+ ['garbage.md',
+ 'trash.md',
+ 'README.md',
+ 'README-bis.md',
+ 'README-secret.md'])
+ assert exclude_paths(
+ base,
+ ['*.md', '!README*.md', 'README-secret.md']
+ ) == set(['README.md', 'README-bis.md'])
+
+ def test_parent_directory(self):
+ base = make_tree(
+ [],
+ ['a.py',
+ 'b.py',
+ 'c.py'])
+ # Dockerignore reference stipulates that absolute paths are
+ # equivalent to relative paths, hence /../foo should be
+ # equivalent to ../foo. It also stipulates that paths are run
+ # through Go's filepath.Clean, which explicitly "replace
+ # "/.." by "/" at the beginning of a path".
+ assert exclude_paths(
+ base,
+ ['../a.py', '/../b.py']
+ ) == set(['c.py'])
+
+
+class TarTest(unittest.TestCase):
+ def test_tar_with_excludes(self):
+ dirs = [
+ 'foo',
+ 'foo/bar',
+ 'bar',
+ ]
+
+ files = [
+ 'Dockerfile',
+ 'Dockerfile.alt',
+ '.dockerignore',
+ 'a.py',
+ 'a.go',
+ 'b.py',
+ 'cde.py',
+ 'foo/a.py',
+ 'foo/b.py',
+ 'foo/bar/a.py',
+ 'bar/a.py',
+ ]
+
+ exclude = [
+ '*.py',
+ '!b.py',
+ '!a.go',
+ 'foo',
+ 'Dockerfile*',
+ '.dockerignore',
+ ]
+
+ expected_names = set([
+ 'Dockerfile',
+ '.dockerignore',
+ 'a.go',
+ 'b.py',
+ 'bar',
+ 'bar/a.py',
+ ])
+
+ base = make_tree(dirs, files)
+ self.addCleanup(shutil.rmtree, base)
+
+ with tar(base, exclude=exclude) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == sorted(expected_names)
+
+ def test_tar_with_empty_directory(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'foo']
+
+ @pytest.mark.skipif(
+ IS_WINDOWS_PLATFORM or os.geteuid() == 0,
+ reason='root user always has access ; no chmod on Windows'
+ )
+ def test_tar_with_inaccessible_file(self):
+ base = tempfile.mkdtemp()
+ full_path = os.path.join(base, 'foo')
+ self.addCleanup(shutil.rmtree, base)
+ with open(full_path, 'w') as f:
+ f.write('content')
+ os.chmod(full_path, 0o222)
+ with pytest.raises(IOError) as ei:
+ tar(base)
+
+ assert 'Can not read file in context: {full_path}'.format(full_path=full_path) in (
+ ei.exconly()
+ )
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_with_file_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ with open(os.path.join(base, 'foo'), 'w') as f:
+ f.write("content")
+ os.makedirs(os.path.join(base, 'bar'))
+ os.symlink('../foo', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_with_directory_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ os.symlink('../foo', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_with_broken_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+
+ os.symlink('../baz', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No UNIX sockets on Win32')
+ def test_tar_socket_file(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ sock = socket.socket(socket.AF_UNIX)
+ self.addCleanup(sock.close)
+ sock.bind(os.path.join(base, 'test.sock'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'foo']
+
+ def tar_test_negative_mtime_bug(self):
+ base = tempfile.mkdtemp()
+ filename = os.path.join(base, 'th.txt')
+ self.addCleanup(shutil.rmtree, base)
+ with open(filename, 'w') as f:
+ f.write('Invisible Full Moon')
+ os.utime(filename, (12345, -3600.0))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert tar_data.getnames() == ['th.txt']
+ assert tar_data.getmember('th.txt').mtime == -3600
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_directory_link(self):
+ dirs = ['a', 'b', 'a/c']
+ files = ['a/hello.py', 'b/utils.py', 'a/c/descend.py']
+ base = make_tree(dirs, files)
+ self.addCleanup(shutil.rmtree, base)
+ os.symlink(os.path.join(base, 'b'), os.path.join(base, 'a/c/b'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ names = tar_data.getnames()
+ for member in dirs + files:
+ assert member in names
+ assert 'a/c/b' in names
+ assert 'a/c/b/utils.py' not in names
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py
new file mode 100644
index 00000000..9448f384
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_config.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+import unittest
+import shutil
+import tempfile
+import json
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from pytest import mark, fixture
+
+from ansible_collections.community.docker.plugins.module_utils._api.utils import config
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class FindConfigFileTest(unittest.TestCase):
+
+ @fixture(autouse=True)
+ def tmpdir(self, tmpdir):
+ self.mkdir = tmpdir.mkdir
+
+ def test_find_config_fallback(self):
+ tmpdir = self.mkdir('test_find_config_fallback')
+
+ with mock.patch.dict(os.environ, {'HOME': str(tmpdir)}):
+ assert config.find_config_file() is None
+
+ def test_find_config_from_explicit_path(self):
+ tmpdir = self.mkdir('test_find_config_from_explicit_path')
+ config_path = tmpdir.ensure('my-config-file.json')
+
+ assert config.find_config_file(str(config_path)) == str(config_path)
+
+ def test_find_config_from_environment(self):
+ tmpdir = self.mkdir('test_find_config_from_environment')
+ config_path = tmpdir.ensure('config.json')
+
+ with mock.patch.dict(os.environ, {'DOCKER_CONFIG': str(tmpdir)}):
+ assert config.find_config_file() == str(config_path)
+
+ @mark.skipif("sys.platform == 'win32'")
+ def test_find_config_from_home_posix(self):
+ tmpdir = self.mkdir('test_find_config_from_home_posix')
+ config_path = tmpdir.ensure('.docker', 'config.json')
+
+ with mock.patch.dict(os.environ, {'HOME': str(tmpdir)}):
+ assert config.find_config_file() == str(config_path)
+
+ @mark.skipif("sys.platform == 'win32'")
+ def test_find_config_from_home_legacy_name(self):
+ tmpdir = self.mkdir('test_find_config_from_home_legacy_name')
+ config_path = tmpdir.ensure('.dockercfg')
+
+ with mock.patch.dict(os.environ, {'HOME': str(tmpdir)}):
+ assert config.find_config_file() == str(config_path)
+
+ @mark.skipif("sys.platform != 'win32'")
+ def test_find_config_from_home_windows(self):
+ tmpdir = self.mkdir('test_find_config_from_home_windows')
+ config_path = tmpdir.ensure('.docker', 'config.json')
+
+ with mock.patch.dict(os.environ, {'USERPROFILE': str(tmpdir)}):
+ assert config.find_config_file() == str(config_path)
+
+
+class LoadConfigTest(unittest.TestCase):
+ def test_load_config_no_file(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg = config.load_general_config(folder)
+ assert cfg is not None
+ assert isinstance(cfg, dict)
+ assert not cfg
+
+ def test_load_config_custom_headers(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder, 'config.json')
+ config_data = {
+ 'HttpHeaders': {
+ 'Name': 'Spike',
+ 'Surname': 'Spiegel'
+ },
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config_data, f)
+
+ cfg = config.load_general_config(dockercfg_path)
+ assert 'HttpHeaders' in cfg
+ assert cfg['HttpHeaders'] == {
+ 'Name': 'Spike',
+ 'Surname': 'Spiegel'
+ }
+
+ def test_load_config_detach_keys(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ dockercfg_path = os.path.join(folder, 'config.json')
+ config_data = {
+ 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i'
+ }
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config_data, f)
+
+ cfg = config.load_general_config(dockercfg_path)
+ assert cfg == config_data
+
+ def test_load_config_from_env(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ dockercfg_path = os.path.join(folder, 'config.json')
+ config_data = {
+ 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i'
+ }
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config_data, f)
+
+ with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
+ cfg = config.load_general_config(None)
+ assert cfg == config_data
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py
new file mode 100644
index 00000000..8ba1ec5f
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_decorators.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.api.client import APIClient
+from ansible_collections.community.docker.plugins.module_utils._api.constants import DEFAULT_DOCKER_API_VERSION
+from ansible_collections.community.docker.plugins.module_utils._api.utils.decorators import update_headers
+
+
+class DecoratorsTest(unittest.TestCase):
+ def test_update_headers(self):
+ sample_headers = {
+ 'X-Docker-Locale': 'en-US',
+ }
+
+ def f(self, headers=None):
+ return headers
+
+ client = APIClient(version=DEFAULT_DOCKER_API_VERSION)
+ client._general_configs = {}
+
+ g = update_headers(f)
+ assert g(client, headers=None) is None
+ assert g(client, headers={}) == {}
+ assert g(client, headers={'Content-type': 'application/json'}) == {
+ 'Content-type': 'application/json',
+ }
+
+ client._general_configs = {
+ 'HttpHeaders': sample_headers
+ }
+
+ assert g(client, headers=None) == sample_headers
+ assert g(client, headers={}) == sample_headers
+ assert g(client, headers={'Content-type': 'application/json'}) == {
+ 'Content-type': 'application/json',
+ 'X-Docker-Locale': 'en-US',
+ }
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py
new file mode 100644
index 00000000..2d7f300f
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_json_stream.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.utils.json_stream import json_splitter, stream_as_text, json_stream
+
+
+class TestJsonSplitter:
+
+ def test_json_splitter_no_object(self):
+ data = '{"foo": "bar'
+ assert json_splitter(data) is None
+
+ def test_json_splitter_with_object(self):
+ data = '{"foo": "bar"}\n \n{"next": "obj"}'
+ assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
+
+ def test_json_splitter_leading_whitespace(self):
+ data = '\n \r{"foo": "bar"}\n\n {"next": "obj"}'
+ assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
+
+
+class TestStreamAsText:
+
+ def test_stream_with_non_utf_unicode_character(self):
+ stream = [b'\xed\xf3\xf3']
+ output, = stream_as_text(stream)
+ assert output == u'���'
+
+ def test_stream_with_utf_character(self):
+ stream = [u'ěĝ'.encode('utf-8')]
+ output, = stream_as_text(stream)
+ assert output == u'ěĝ'
+
+
+class TestJsonStream:
+
+ def test_with_falsy_entries(self):
+ stream = [
+ '{"one": "two"}\n{}\n',
+ "[1, 2, 3]\n[]\n",
+ ]
+ output = list(json_stream(stream))
+ assert output == [
+ {'one': 'two'},
+ {},
+ [1, 2, 3],
+ [],
+ ]
+
+ def test_with_leading_whitespace(self):
+ stream = [
+ '\n \r\n {"one": "two"}{"x": 1}',
+ ' {"three": "four"}\t\t{"x": 2}'
+ ]
+ output = list(json_stream(stream))
+ assert output == [
+ {'one': 'two'},
+ {'x': 1},
+ {'three': 'four'},
+ {'x': 2}
+ ]
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py
new file mode 100644
index 00000000..c1a08a12
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_ports.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.utils.ports import build_port_bindings, split_port
+
+
+class PortsTest(unittest.TestCase):
+ def test_split_port_with_host_ip(self):
+ internal_port, external_port = split_port("127.0.0.1:1000:2000")
+ assert internal_port == ["2000"]
+ assert external_port == [("127.0.0.1", "1000")]
+
+ def test_split_port_with_protocol(self):
+ for protocol in ['tcp', 'udp', 'sctp']:
+ internal_port, external_port = split_port(
+ "127.0.0.1:1000:2000/" + protocol
+ )
+ assert internal_port == ["2000/" + protocol]
+ assert external_port == [("127.0.0.1", "1000")]
+
+ def test_split_port_with_host_ip_no_port(self):
+ internal_port, external_port = split_port("127.0.0.1::2000")
+ assert internal_port == ["2000"]
+ assert external_port == [("127.0.0.1", None)]
+
+ def test_split_port_range_with_host_ip_no_port(self):
+ internal_port, external_port = split_port("127.0.0.1::2000-2001")
+ assert internal_port == ["2000", "2001"]
+ assert external_port == [("127.0.0.1", None), ("127.0.0.1", None)]
+
+ def test_split_port_with_host_port(self):
+ internal_port, external_port = split_port("1000:2000")
+ assert internal_port == ["2000"]
+ assert external_port == ["1000"]
+
+ def test_split_port_range_with_host_port(self):
+ internal_port, external_port = split_port("1000-1001:2000-2001")
+ assert internal_port == ["2000", "2001"]
+ assert external_port == ["1000", "1001"]
+
+ def test_split_port_random_port_range_with_host_port(self):
+ internal_port, external_port = split_port("1000-1001:2000")
+ assert internal_port == ["2000"]
+ assert external_port == ["1000-1001"]
+
+ def test_split_port_no_host_port(self):
+ internal_port, external_port = split_port("2000")
+ assert internal_port == ["2000"]
+ assert external_port is None
+
+ def test_split_port_range_no_host_port(self):
+ internal_port, external_port = split_port("2000-2001")
+ assert internal_port == ["2000", "2001"]
+ assert external_port is None
+
+ def test_split_port_range_with_protocol(self):
+ internal_port, external_port = split_port(
+ "127.0.0.1:1000-1001:2000-2001/udp")
+ assert internal_port == ["2000/udp", "2001/udp"]
+ assert external_port == [("127.0.0.1", "1000"), ("127.0.0.1", "1001")]
+
+ def test_split_port_with_ipv6_address(self):
+ internal_port, external_port = split_port(
+ "2001:abcd:ef00::2:1000:2000")
+ assert internal_port == ["2000"]
+ assert external_port == [("2001:abcd:ef00::2", "1000")]
+
+ def test_split_port_with_ipv6_square_brackets_address(self):
+ internal_port, external_port = split_port(
+ "[2001:abcd:ef00::2]:1000:2000")
+ assert internal_port == ["2000"]
+ assert external_port == [("2001:abcd:ef00::2", "1000")]
+
+ def test_split_port_invalid(self):
+ with pytest.raises(ValueError):
+ split_port("0.0.0.0:1000:2000:tcp")
+
+ def test_split_port_invalid_protocol(self):
+ with pytest.raises(ValueError):
+ split_port("0.0.0.0:1000:2000/ftp")
+
+ def test_non_matching_length_port_ranges(self):
+ with pytest.raises(ValueError):
+ split_port("0.0.0.0:1000-1010:2000-2002/tcp")
+
+ def test_port_and_range_invalid(self):
+ with pytest.raises(ValueError):
+ split_port("0.0.0.0:1000:2000-2002/tcp")
+
+ def test_port_only_with_colon(self):
+ with pytest.raises(ValueError):
+ split_port(":80")
+
+ def test_host_only_with_colon(self):
+ with pytest.raises(ValueError):
+ split_port("localhost:")
+
+ def test_with_no_container_port(self):
+ with pytest.raises(ValueError):
+ split_port("localhost:80:")
+
+ def test_split_port_empty_string(self):
+ with pytest.raises(ValueError):
+ split_port("")
+
+ def test_split_port_non_string(self):
+ assert split_port(1243) == (['1243'], None)
+
+ def test_build_port_bindings_with_one_port(self):
+ port_bindings = build_port_bindings(["127.0.0.1:1000:1000"])
+ assert port_bindings["1000"] == [("127.0.0.1", "1000")]
+
+ def test_build_port_bindings_with_matching_internal_ports(self):
+ port_bindings = build_port_bindings(
+ ["127.0.0.1:1000:1000", "127.0.0.1:2000:1000"])
+ assert port_bindings["1000"] == [
+ ("127.0.0.1", "1000"), ("127.0.0.1", "2000")
+ ]
+
+ def test_build_port_bindings_with_nonmatching_internal_ports(self):
+ port_bindings = build_port_bindings(
+ ["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"])
+ assert port_bindings["1000"] == [("127.0.0.1", "1000")]
+ assert port_bindings["2000"] == [("127.0.0.1", "2000")]
+
+ def test_build_port_bindings_with_port_range(self):
+ port_bindings = build_port_bindings(["127.0.0.1:1000-1001:1000-1001"])
+ assert port_bindings["1000"] == [("127.0.0.1", "1000")]
+ assert port_bindings["1001"] == [("127.0.0.1", "1001")]
+
+ def test_build_port_bindings_with_matching_internal_port_ranges(self):
+ port_bindings = build_port_bindings(
+ ["127.0.0.1:1000-1001:1000-1001", "127.0.0.1:2000-2001:1000-1001"])
+ assert port_bindings["1000"] == [
+ ("127.0.0.1", "1000"), ("127.0.0.1", "2000")
+ ]
+ assert port_bindings["1001"] == [
+ ("127.0.0.1", "1001"), ("127.0.0.1", "2001")
+ ]
+
+ def test_build_port_bindings_with_nonmatching_internal_port_ranges(self):
+ port_bindings = build_port_bindings(
+ ["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"])
+ assert port_bindings["1000"] == [("127.0.0.1", "1000")]
+ assert port_bindings["2000"] == [("127.0.0.1", "2000")]
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py
new file mode 100644
index 00000000..0eb24270
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_proxy.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import unittest
+import sys
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.utils.proxy import ProxyConfig
+
+
+HTTP = 'http://test:80'
+HTTPS = 'https://test:443'
+FTP = 'ftp://user:password@host:23'
+NO_PROXY = 'localhost,.localdomain'
+CONFIG = ProxyConfig(http=HTTP, https=HTTPS, ftp=FTP, no_proxy=NO_PROXY)
+ENV = {
+ 'http_proxy': HTTP,
+ 'HTTP_PROXY': HTTP,
+ 'https_proxy': HTTPS,
+ 'HTTPS_PROXY': HTTPS,
+ 'ftp_proxy': FTP,
+ 'FTP_PROXY': FTP,
+ 'no_proxy': NO_PROXY,
+ 'NO_PROXY': NO_PROXY,
+}
+
+
+class ProxyConfigTest(unittest.TestCase):
+
+ def test_from_dict(self):
+ config = ProxyConfig.from_dict({
+ 'httpProxy': HTTP,
+ 'httpsProxy': HTTPS,
+ 'ftpProxy': FTP,
+ 'noProxy': NO_PROXY
+ })
+ self.assertEqual(CONFIG.http, config.http)
+ self.assertEqual(CONFIG.https, config.https)
+ self.assertEqual(CONFIG.ftp, config.ftp)
+ self.assertEqual(CONFIG.no_proxy, config.no_proxy)
+
+ def test_new(self):
+ config = ProxyConfig()
+ self.assertIsNone(config.http)
+ self.assertIsNone(config.https)
+ self.assertIsNone(config.ftp)
+ self.assertIsNone(config.no_proxy)
+
+ config = ProxyConfig(http='a', https='b', ftp='c', no_proxy='d')
+ self.assertEqual(config.http, 'a')
+ self.assertEqual(config.https, 'b')
+ self.assertEqual(config.ftp, 'c')
+ self.assertEqual(config.no_proxy, 'd')
+
+ def test_truthiness(self):
+ assert not ProxyConfig()
+ assert ProxyConfig(http='non-zero')
+ assert ProxyConfig(https='non-zero')
+ assert ProxyConfig(ftp='non-zero')
+ assert ProxyConfig(no_proxy='non-zero')
+
+ def test_environment(self):
+ self.assertDictEqual(CONFIG.get_environment(), ENV)
+ empty = ProxyConfig()
+ self.assertDictEqual(empty.get_environment(), {})
+
+ def test_inject_proxy_environment(self):
+ # Proxy config is non null, env is None.
+ self.assertSetEqual(
+ set(CONFIG.inject_proxy_environment(None)),
+ set('{k}={v}'.format(k=k, v=v) for k, v in ENV.items()))
+
+ # Proxy config is null, env is None.
+ self.assertIsNone(ProxyConfig().inject_proxy_environment(None), None)
+
+ env = ['FOO=BAR', 'BAR=BAZ']
+
+ # Proxy config is non null, env is non null
+ actual = CONFIG.inject_proxy_environment(env)
+ expected = ['{k}={v}'.format(k=k, v=v) for k, v in ENV.items()] + env
+ # It's important that the first 8 variables are the ones from the proxy
+ # config, and the last 2 are the ones from the input environment
+ self.assertSetEqual(set(actual[:8]), set(expected[:8]))
+ self.assertSetEqual(set(actual[-2:]), set(expected[-2:]))
+
+ # Proxy is null, and is non null
+ self.assertListEqual(ProxyConfig().inject_proxy_environment(env), env)
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py
new file mode 100644
index 00000000..cc0dc695
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/test_utils.py
@@ -0,0 +1,488 @@
+# -*- coding: utf-8 -*-
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import base64
+import json
+import os
+import os.path
+import shutil
+import tempfile
+import unittest
+import sys
+
+from ansible.module_utils.six import PY3
+
+import pytest
+
+if sys.version_info < (2, 7):
+ pytestmark = pytest.mark.skip('Python 2.6 is not supported')
+
+from ansible_collections.community.docker.plugins.module_utils._api.api.client import APIClient
+from ansible_collections.community.docker.plugins.module_utils._api.constants import IS_WINDOWS_PLATFORM, DEFAULT_DOCKER_API_VERSION
+from ansible_collections.community.docker.plugins.module_utils._api.errors import DockerException
+from ansible_collections.community.docker.plugins.module_utils._api.utils.utils import (
+ convert_filters, convert_volume_binds,
+ decode_json_header, kwargs_from_env, parse_bytes,
+ parse_devices, parse_env_file, parse_host,
+ parse_repository_tag, split_command, format_environment,
+)
+
+
+TEST_CERT_DIR = os.path.join(
+ os.path.dirname(__file__),
+ 'testdata/certs',
+)
+
+
+class KwargsFromEnvTest(unittest.TestCase):
+ def setUp(self):
+ self.os_environ = os.environ.copy()
+
+ def tearDown(self):
+ os.environ = self.os_environ
+
+ def test_kwargs_from_env_empty(self):
+ os.environ.update(DOCKER_HOST='',
+ DOCKER_CERT_PATH='')
+ os.environ.pop('DOCKER_TLS_VERIFY', None)
+
+ kwargs = kwargs_from_env()
+ assert kwargs.get('base_url') is None
+ assert kwargs.get('tls') is None
+
+ def test_kwargs_from_env_tls(self):
+ os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
+ DOCKER_CERT_PATH=TEST_CERT_DIR,
+ DOCKER_TLS_VERIFY='1')
+ kwargs = kwargs_from_env(assert_hostname=False)
+ assert 'tcp://192.168.59.103:2376' == kwargs['base_url']
+ assert 'ca.pem' in kwargs['tls'].ca_cert
+ assert 'cert.pem' in kwargs['tls'].cert[0]
+ assert 'key.pem' in kwargs['tls'].cert[1]
+ assert kwargs['tls'].assert_hostname is False
+ assert kwargs['tls'].verify
+
+ parsed_host = parse_host(kwargs['base_url'], IS_WINDOWS_PLATFORM, True)
+ kwargs['version'] = DEFAULT_DOCKER_API_VERSION
+ try:
+ client = APIClient(**kwargs)
+ assert parsed_host == client.base_url
+ assert kwargs['tls'].ca_cert == client.verify
+ assert kwargs['tls'].cert == client.cert
+ except TypeError as e:
+ self.fail(e)
+
+ def test_kwargs_from_env_tls_verify_false(self):
+ os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
+ DOCKER_CERT_PATH=TEST_CERT_DIR,
+ DOCKER_TLS_VERIFY='')
+ kwargs = kwargs_from_env(assert_hostname=True)
+ assert 'tcp://192.168.59.103:2376' == kwargs['base_url']
+ assert 'ca.pem' in kwargs['tls'].ca_cert
+ assert 'cert.pem' in kwargs['tls'].cert[0]
+ assert 'key.pem' in kwargs['tls'].cert[1]
+ assert kwargs['tls'].assert_hostname is True
+ assert kwargs['tls'].verify is False
+ parsed_host = parse_host(kwargs['base_url'], IS_WINDOWS_PLATFORM, True)
+ kwargs['version'] = DEFAULT_DOCKER_API_VERSION
+ try:
+ client = APIClient(**kwargs)
+ assert parsed_host == client.base_url
+ assert kwargs['tls'].cert == client.cert
+ assert not kwargs['tls'].verify
+ except TypeError as e:
+ self.fail(e)
+
+ def test_kwargs_from_env_tls_verify_false_no_cert(self):
+ temp_dir = tempfile.mkdtemp()
+ cert_dir = os.path.join(temp_dir, '.docker')
+ shutil.copytree(TEST_CERT_DIR, cert_dir)
+
+ os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
+ HOME=temp_dir,
+ DOCKER_TLS_VERIFY='')
+ os.environ.pop('DOCKER_CERT_PATH', None)
+ kwargs = kwargs_from_env(assert_hostname=True)
+ assert 'tcp://192.168.59.103:2376' == kwargs['base_url']
+
+ def test_kwargs_from_env_no_cert_path(self):
+ try:
+ temp_dir = tempfile.mkdtemp()
+ cert_dir = os.path.join(temp_dir, '.docker')
+ shutil.copytree(TEST_CERT_DIR, cert_dir)
+
+ os.environ.update(HOME=temp_dir,
+ DOCKER_CERT_PATH='',
+ DOCKER_TLS_VERIFY='1')
+
+ kwargs = kwargs_from_env()
+ assert kwargs['tls'].verify
+ assert cert_dir in kwargs['tls'].ca_cert
+ assert cert_dir in kwargs['tls'].cert[0]
+ assert cert_dir in kwargs['tls'].cert[1]
+ finally:
+ if temp_dir:
+ shutil.rmtree(temp_dir)
+
+ def test_kwargs_from_env_alternate_env(self):
+ # Values in os.environ are entirely ignored if an alternate is
+ # provided
+ os.environ.update(
+ DOCKER_HOST='tcp://192.168.59.103:2376',
+ DOCKER_CERT_PATH=TEST_CERT_DIR,
+ DOCKER_TLS_VERIFY=''
+ )
+ kwargs = kwargs_from_env(environment={
+ 'DOCKER_HOST': 'http://docker.gensokyo.jp:2581',
+ })
+ assert 'http://docker.gensokyo.jp:2581' == kwargs['base_url']
+ assert 'tls' not in kwargs
+
+
+class ConverVolumeBindsTest(unittest.TestCase):
+ def test_convert_volume_binds_empty(self):
+ assert convert_volume_binds({}) == []
+ assert convert_volume_binds([]) == []
+
+ def test_convert_volume_binds_list(self):
+ data = ['/a:/a:ro', '/b:/c:z']
+ assert convert_volume_binds(data) == data
+
+ def test_convert_volume_binds_complete(self):
+ data = {
+ '/mnt/vol1': {
+ 'bind': '/data',
+ 'mode': 'ro'
+ }
+ }
+ assert convert_volume_binds(data) == ['/mnt/vol1:/data:ro']
+
+ def test_convert_volume_binds_compact(self):
+ data = {
+ '/mnt/vol1': '/data'
+ }
+ assert convert_volume_binds(data) == ['/mnt/vol1:/data:rw']
+
+ def test_convert_volume_binds_no_mode(self):
+ data = {
+ '/mnt/vol1': {
+ 'bind': '/data'
+ }
+ }
+ assert convert_volume_binds(data) == ['/mnt/vol1:/data:rw']
+
+ def test_convert_volume_binds_unicode_bytes_input(self):
+ expected = [u'/mnt/지연:/unicode/박:rw']
+
+ data = {
+ u'/mnt/지연'.encode('utf-8'): {
+ 'bind': u'/unicode/박'.encode('utf-8'),
+ 'mode': u'rw'
+ }
+ }
+ assert convert_volume_binds(data) == expected
+
+ def test_convert_volume_binds_unicode_unicode_input(self):
+ expected = [u'/mnt/지연:/unicode/박:rw']
+
+ data = {
+ u'/mnt/지연': {
+ 'bind': u'/unicode/박',
+ 'mode': u'rw'
+ }
+ }
+ assert convert_volume_binds(data) == expected
+
+
+class ParseEnvFileTest(unittest.TestCase):
+ def generate_tempfile(self, file_content=None):
+ """
+ Generates a temporary file for tests with the content
+ of 'file_content' and returns the filename.
+ Don't forget to unlink the file with os.unlink() after.
+ """
+ local_tempfile = tempfile.NamedTemporaryFile(delete=False)
+ local_tempfile.write(file_content.encode('UTF-8'))
+ local_tempfile.close()
+ return local_tempfile.name
+
+ def test_parse_env_file_proper(self):
+ env_file = self.generate_tempfile(
+ file_content='USER=jdoe\nPASS=secret')
+ get_parse_env_file = parse_env_file(env_file)
+ assert get_parse_env_file == {'USER': 'jdoe', 'PASS': 'secret'}
+ os.unlink(env_file)
+
+ def test_parse_env_file_with_equals_character(self):
+ env_file = self.generate_tempfile(
+ file_content='USER=jdoe\nPASS=sec==ret')
+ get_parse_env_file = parse_env_file(env_file)
+ assert get_parse_env_file == {'USER': 'jdoe', 'PASS': 'sec==ret'}
+ os.unlink(env_file)
+
+ def test_parse_env_file_commented_line(self):
+ env_file = self.generate_tempfile(
+ file_content='USER=jdoe\n#PASS=secret')
+ get_parse_env_file = parse_env_file(env_file)
+ assert get_parse_env_file == {'USER': 'jdoe'}
+ os.unlink(env_file)
+
+ def test_parse_env_file_newline(self):
+ env_file = self.generate_tempfile(
+ file_content='\nUSER=jdoe\n\n\nPASS=secret')
+ get_parse_env_file = parse_env_file(env_file)
+ assert get_parse_env_file == {'USER': 'jdoe', 'PASS': 'secret'}
+ os.unlink(env_file)
+
+ def test_parse_env_file_invalid_line(self):
+ env_file = self.generate_tempfile(
+ file_content='USER jdoe')
+ with pytest.raises(DockerException):
+ parse_env_file(env_file)
+ os.unlink(env_file)
+
+
+class ParseHostTest(unittest.TestCase):
+ def test_parse_host(self):
+ invalid_hosts = [
+ '0.0.0.0',
+ 'tcp://',
+ 'udp://127.0.0.1',
+ 'udp://127.0.0.1:2375',
+ 'ssh://:22/path',
+ 'tcp://netloc:3333/path?q=1',
+ 'unix:///sock/path#fragment',
+ 'https://netloc:3333/path;params',
+ 'ssh://:clearpassword@host:22',
+ ]
+
+ valid_hosts = {
+ '0.0.0.1:5555': 'http://0.0.0.1:5555',
+ ':6666': 'http://127.0.0.1:6666',
+ 'tcp://:7777': 'http://127.0.0.1:7777',
+ 'http://:7777': 'http://127.0.0.1:7777',
+ 'https://kokia.jp:2375': 'https://kokia.jp:2375',
+ 'unix:///var/run/docker.sock': 'http+unix:///var/run/docker.sock',
+ 'unix://': 'http+unix:///var/run/docker.sock',
+ '12.234.45.127:2375/docker/engine': (
+ 'http://12.234.45.127:2375/docker/engine'
+ ),
+ 'somehost.net:80/service/swarm': (
+ 'http://somehost.net:80/service/swarm'
+ ),
+ 'npipe:////./pipe/docker_engine': 'npipe:////./pipe/docker_engine',
+ '[fd12::82d1]:2375': 'http://[fd12::82d1]:2375',
+ 'https://[fd12:5672::12aa]:1090': 'https://[fd12:5672::12aa]:1090',
+ '[fd12::82d1]:2375/docker/engine': (
+ 'http://[fd12::82d1]:2375/docker/engine'
+ ),
+ 'ssh://[fd12::82d1]': 'ssh://[fd12::82d1]:22',
+ 'ssh://user@[fd12::82d1]:8765': 'ssh://user@[fd12::82d1]:8765',
+ 'ssh://': 'ssh://127.0.0.1:22',
+ 'ssh://user@localhost:22': 'ssh://user@localhost:22',
+ 'ssh://user@remote': 'ssh://user@remote:22',
+ }
+
+ for host in invalid_hosts:
+ msg = 'Should have failed to parse invalid host: {0}'.format(host)
+ with self.assertRaises(DockerException, msg=msg):
+ parse_host(host, None)
+
+ for host, expected in valid_hosts.items():
+ self.assertEqual(
+ parse_host(host, None),
+ expected,
+ msg='Failed to parse valid host: {0}'.format(host),
+ )
+
+ def test_parse_host_empty_value(self):
+ unix_socket = 'http+unix:///var/run/docker.sock'
+ npipe = 'npipe:////./pipe/docker_engine'
+
+ for val in [None, '']:
+ assert parse_host(val, is_win32=False) == unix_socket
+ assert parse_host(val, is_win32=True) == npipe
+
+ def test_parse_host_tls(self):
+ host_value = 'myhost.docker.net:3348'
+ expected_result = 'https://myhost.docker.net:3348'
+ assert parse_host(host_value, tls=True) == expected_result
+
+ def test_parse_host_tls_tcp_proto(self):
+ host_value = 'tcp://myhost.docker.net:3348'
+ expected_result = 'https://myhost.docker.net:3348'
+ assert parse_host(host_value, tls=True) == expected_result
+
+ def test_parse_host_trailing_slash(self):
+ host_value = 'tcp://myhost.docker.net:2376/'
+ expected_result = 'http://myhost.docker.net:2376'
+ assert parse_host(host_value) == expected_result
+
+
+class ParseRepositoryTagTest(unittest.TestCase):
+ sha = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
+
+ def test_index_image_no_tag(self):
+ assert parse_repository_tag("root") == ("root", None)
+
+ def test_index_image_tag(self):
+ assert parse_repository_tag("root:tag") == ("root", "tag")
+
+ def test_index_user_image_no_tag(self):
+ assert parse_repository_tag("user/repo") == ("user/repo", None)
+
+ def test_index_user_image_tag(self):
+ assert parse_repository_tag("user/repo:tag") == ("user/repo", "tag")
+
+ def test_private_reg_image_no_tag(self):
+ assert parse_repository_tag("url:5000/repo") == ("url:5000/repo", None)
+
+ def test_private_reg_image_tag(self):
+ assert parse_repository_tag("url:5000/repo:tag") == (
+ "url:5000/repo", "tag"
+ )
+
+ def test_index_image_sha(self):
+ assert parse_repository_tag("root@sha256:{sha}".format(sha=self.sha)) == (
+ "root", "sha256:{sha}".format(sha=self.sha)
+ )
+
+ def test_private_reg_image_sha(self):
+ assert parse_repository_tag(
+ "url:5000/repo@sha256:{sha}".format(sha=self.sha)
+ ) == ("url:5000/repo", "sha256:{sha}".format(sha=self.sha))
+
+
+class ParseDeviceTest(unittest.TestCase):
+ def test_dict(self):
+ devices = parse_devices([{
+ 'PathOnHost': '/dev/sda1',
+ 'PathInContainer': '/dev/mnt1',
+ 'CgroupPermissions': 'r'
+ }])
+ assert devices[0] == {
+ 'PathOnHost': '/dev/sda1',
+ 'PathInContainer': '/dev/mnt1',
+ 'CgroupPermissions': 'r'
+ }
+
+ def test_partial_string_definition(self):
+ devices = parse_devices(['/dev/sda1'])
+ assert devices[0] == {
+ 'PathOnHost': '/dev/sda1',
+ 'PathInContainer': '/dev/sda1',
+ 'CgroupPermissions': 'rwm'
+ }
+
+ def test_permissionless_string_definition(self):
+ devices = parse_devices(['/dev/sda1:/dev/mnt1'])
+ assert devices[0] == {
+ 'PathOnHost': '/dev/sda1',
+ 'PathInContainer': '/dev/mnt1',
+ 'CgroupPermissions': 'rwm'
+ }
+
+ def test_full_string_definition(self):
+ devices = parse_devices(['/dev/sda1:/dev/mnt1:r'])
+ assert devices[0] == {
+ 'PathOnHost': '/dev/sda1',
+ 'PathInContainer': '/dev/mnt1',
+ 'CgroupPermissions': 'r'
+ }
+
+ def test_hybrid_list(self):
+ devices = parse_devices([
+ '/dev/sda1:/dev/mnt1:rw',
+ {
+ 'PathOnHost': '/dev/sda2',
+ 'PathInContainer': '/dev/mnt2',
+ 'CgroupPermissions': 'r'
+ }
+ ])
+
+ assert devices[0] == {
+ 'PathOnHost': '/dev/sda1',
+ 'PathInContainer': '/dev/mnt1',
+ 'CgroupPermissions': 'rw'
+ }
+ assert devices[1] == {
+ 'PathOnHost': '/dev/sda2',
+ 'PathInContainer': '/dev/mnt2',
+ 'CgroupPermissions': 'r'
+ }
+
+
+class ParseBytesTest(unittest.TestCase):
+ def test_parse_bytes_valid(self):
+ assert parse_bytes("512MB") == 536870912
+ assert parse_bytes("512M") == 536870912
+ assert parse_bytes("512m") == 536870912
+
+ def test_parse_bytes_invalid(self):
+ with pytest.raises(DockerException):
+ parse_bytes("512MK")
+ with pytest.raises(DockerException):
+ parse_bytes("512L")
+ with pytest.raises(DockerException):
+ parse_bytes("127.0.0.1K")
+
+ def test_parse_bytes_float(self):
+ assert parse_bytes("1.5k") == 1536
+
+
+class UtilsTest(unittest.TestCase):
+ longMessage = True
+
+ def test_convert_filters(self):
+ tests = [
+ ({'dangling': True}, '{"dangling": ["true"]}'),
+ ({'dangling': "true"}, '{"dangling": ["true"]}'),
+ ({'exited': 0}, '{"exited": ["0"]}'),
+ ({'exited': [0, 1]}, '{"exited": ["0", "1"]}'),
+ ]
+
+ for filters, expected in tests:
+ assert convert_filters(filters) == expected
+
+ def test_decode_json_header(self):
+ obj = {'a': 'b', 'c': 1}
+ data = None
+ if PY3:
+ data = base64.urlsafe_b64encode(bytes(json.dumps(obj), 'utf-8'))
+ else:
+ data = base64.urlsafe_b64encode(json.dumps(obj))
+ decoded_data = decode_json_header(data)
+ assert obj == decoded_data
+
+
+class SplitCommandTest(unittest.TestCase):
+ def test_split_command_with_unicode(self):
+ assert split_command(u'echo μμ') == ['echo', 'μμ']
+
+ @pytest.mark.skipif(PY3, reason="shlex doesn't support bytes in py3")
+ def test_split_command_with_bytes(self):
+ assert split_command('echo μμ') == ['echo', 'μμ']
+
+
+class FormatEnvironmentTest(unittest.TestCase):
+ def test_format_env_binary_unicode_value(self):
+ env_dict = {
+ 'ARTIST_NAME': b'\xec\x86\xa1\xec\xa7\x80\xec\x9d\x80'
+ }
+ assert format_environment(env_dict) == [u'ARTIST_NAME=송지은']
+
+ def test_format_env_no_value(self):
+ env_dict = {
+ 'FOO': None,
+ 'BAR': '',
+ }
+ assert sorted(format_environment(env_dict)) == ['BAR=', 'FOO']
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem
new file mode 100644
index 00000000..5c7093a3
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/ca.pem
@@ -0,0 +1,7 @@
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem
new file mode 100644
index 00000000..5c7093a3
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/cert.pem
@@ -0,0 +1,7 @@
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem
new file mode 100644
index 00000000..5c7093a3
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/_api/utils/testdata/certs/key.pem
@@ -0,0 +1,7 @@
+# This code is part of the Ansible collection community.docker, but is an independent component.
+# This particular file, and this file only, is based on the Docker SDK for Python (https://github.com/docker/docker-py/)
+#
+# Copyright (c) 2016-2022 Docker, Inc.
+#
+# It is licensed under the Apache 2.0 license (see LICENSES/Apache-2.0.txt in this collection)
+# SPDX-License-Identifier: Apache-2.0
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py
new file mode 100644
index 00000000..ff004306
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test__scramble.py
@@ -0,0 +1,28 @@
+# Copyright 2022 Red Hat | Ansible
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible_collections.community.docker.plugins.module_utils._scramble import (
+ scramble,
+ unscramble,
+)
+
+
+@pytest.mark.parametrize('plaintext, key, scrambled', [
+ (u'', b'0', '=S='),
+ (u'hello', b'\x00', '=S=aGVsbG8='),
+ (u'hello', b'\x01', '=S=aWRtbW4='),
+])
+def test_scramble_unscramble(plaintext, key, scrambled):
+ scrambled_ = scramble(plaintext, key)
+ print('{0!r} == {1!r}'.format(scrambled_, scrambled))
+ assert scrambled_ == scrambled
+
+ plaintext_ = unscramble(scrambled, key)
+ print('{0!r} == {1!r}'.format(plaintext_, plaintext))
+ assert plaintext_ == plaintext
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py
new file mode 100644
index 00000000..3668573b
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_copy.py
@@ -0,0 +1,77 @@
+# Copyright 2022 Red Hat | Ansible
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible_collections.community.docker.plugins.module_utils.copy import (
+ _stream_generator_to_fileobj,
+)
+
+
+def _simple_generator(sequence):
+ for elt in sequence:
+ yield elt
+
+
+@pytest.mark.parametrize('chunks, read_sizes', [
+ (
+ [
+ (1, b'1'),
+ (1, b'2'),
+ (1, b'3'),
+ (1, b'4'),
+ ],
+ [
+ 1,
+ 2,
+ 3,
+ ]
+ ),
+ (
+ [
+ (1, b'123'),
+ (1, b'456'),
+ (1, b'789'),
+ ],
+ [
+ 1,
+ 4,
+ 2,
+ 2,
+ 2,
+ ]
+ ),
+ (
+ [
+ (10 * 1024 * 1024, b'0'),
+ (10 * 1024 * 1024, b'1'),
+ ],
+ [
+ 1024 * 1024 - 5,
+ 5 * 1024 * 1024 - 3,
+ 10 * 1024 * 1024 - 2,
+ 2 * 1024 * 1024 - 1,
+ 2 * 1024 * 1024 + 5 + 3 + 2 + 1,
+ ]
+ ),
+])
+def test__stream_generator_to_fileobj(chunks, read_sizes):
+ chunks = [count * data for count, data in chunks]
+ stream = _simple_generator(chunks)
+ expected = b''.join(chunks)
+
+ buffer = b''
+ totally_read = 0
+ f = _stream_generator_to_fileobj(stream)
+ for read_size in read_sizes:
+ chunk = f.read(read_size)
+ assert len(chunk) == min(read_size, len(expected) - len(buffer))
+ buffer += chunk
+ totally_read += read_size
+
+ assert buffer == expected[:len(buffer)]
+ assert min(totally_read, len(expected)) == len(buffer)
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py
new file mode 100644
index 00000000..10573b96
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_image_archive.py
@@ -0,0 +1,94 @@
+# Copyright 2022 Red Hat | Ansible
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+import tarfile
+
+from ansible_collections.community.docker.plugins.module_utils.image_archive import (
+ api_image_id,
+ archived_image_manifest,
+ ImageArchiveInvalidException
+)
+
+from ..test_support.docker_image_archive_stubbing import (
+ write_imitation_archive,
+ write_imitation_archive_with_manifest,
+ write_irrelevant_tar,
+)
+
+
+@pytest.fixture
+def tar_file_name(tmpdir):
+ '''
+ Return the name of a non-existing tar file in an existing temporary directory.
+ '''
+
+ # Cast to str required by Python 2.x
+ return str(tmpdir.join('foo.tar'))
+
+
+@pytest.mark.parametrize('expected, value', [
+ ('sha256:foo', 'foo'),
+ ('sha256:bar', 'bar')
+])
+def test_api_image_id_from_archive_id(expected, value):
+ assert api_image_id(value) == expected
+
+
+def test_archived_image_manifest_extracts(tar_file_name):
+ expected_id = "abcde12345"
+ expected_tags = ["foo:latest", "bar:v1"]
+
+ write_imitation_archive(tar_file_name, expected_id, expected_tags)
+
+ actual = archived_image_manifest(tar_file_name)
+
+ assert actual.image_id == expected_id
+ assert actual.repo_tags == expected_tags
+
+
+def test_archived_image_manifest_extracts_nothing_when_file_not_present(tar_file_name):
+ image_id = archived_image_manifest(tar_file_name)
+
+ assert image_id is None
+
+
+def test_archived_image_manifest_raises_when_file_not_a_tar():
+ try:
+ archived_image_manifest(__file__)
+ raise AssertionError()
+ except ImageArchiveInvalidException as e:
+ assert isinstance(e.cause, tarfile.ReadError)
+ assert str(__file__) in str(e)
+
+
+def test_archived_image_manifest_raises_when_tar_missing_manifest(tar_file_name):
+ write_irrelevant_tar(tar_file_name)
+
+ try:
+ archived_image_manifest(tar_file_name)
+ raise AssertionError()
+ except ImageArchiveInvalidException as e:
+ assert isinstance(e.cause, KeyError)
+ assert 'manifest.json' in str(e.cause)
+
+
+def test_archived_image_manifest_raises_when_manifest_missing_id(tar_file_name):
+ manifest = [
+ {
+ 'foo': 'bar'
+ }
+ ]
+
+ write_imitation_archive_with_manifest(tar_file_name, manifest)
+
+ try:
+ archived_image_manifest(tar_file_name)
+ raise AssertionError()
+ except ImageArchiveInvalidException as e:
+ assert isinstance(e.cause, KeyError)
+ assert 'Config' in str(e.cause)
diff --git a/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py
new file mode 100644
index 00000000..c7d36212
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/module_utils/test_util.py
@@ -0,0 +1,522 @@
+# Copyright (c) Ansible Project
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible_collections.community.docker.plugins.module_utils.util import (
+ compare_dict_allow_more_present,
+ compare_generic,
+ convert_duration_to_nanosecond,
+ parse_healthcheck
+)
+
+DICT_ALLOW_MORE_PRESENT = (
+ {
+ 'av': {},
+ 'bv': {'a': 1},
+ 'result': True
+ },
+ {
+ 'av': {'a': 1},
+ 'bv': {'a': 1, 'b': 2},
+ 'result': True
+ },
+ {
+ 'av': {'a': 1},
+ 'bv': {'b': 2},
+ 'result': False
+ },
+ {
+ 'av': {'a': 1},
+ 'bv': {'a': None, 'b': 1},
+ 'result': False
+ },
+ {
+ 'av': {'a': None},
+ 'bv': {'b': 1},
+ 'result': False
+ },
+)
+
+COMPARE_GENERIC = [
+ ########################################################################################
+ # value
+ {
+ 'a': 1,
+ 'b': 2,
+ 'method': 'strict',
+ 'type': 'value',
+ 'result': False
+ },
+ {
+ 'a': 'hello',
+ 'b': 'hello',
+ 'method': 'strict',
+ 'type': 'value',
+ 'result': True
+ },
+ {
+ 'a': None,
+ 'b': 'hello',
+ 'method': 'strict',
+ 'type': 'value',
+ 'result': False
+ },
+ {
+ 'a': None,
+ 'b': None,
+ 'method': 'strict',
+ 'type': 'value',
+ 'result': True
+ },
+ {
+ 'a': 1,
+ 'b': 2,
+ 'method': 'ignore',
+ 'type': 'value',
+ 'result': True
+ },
+ {
+ 'a': None,
+ 'b': 2,
+ 'method': 'ignore',
+ 'type': 'value',
+ 'result': True
+ },
+ ########################################################################################
+ # list
+ {
+ 'a': [
+ 'x',
+ ],
+ 'b': [
+ 'y',
+ ],
+ 'method': 'strict',
+ 'type': 'list',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ ],
+ 'b': [
+ 'x',
+ 'x',
+ ],
+ 'method': 'strict',
+ 'type': 'list',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ ],
+ 'method': 'strict',
+ 'type': 'list',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'y',
+ 'x',
+ ],
+ 'method': 'strict',
+ 'type': 'list',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'x',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'list',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'list',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'list',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ 'z',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ 'x',
+ 'z',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'list',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'y',
+ 'x',
+ ],
+ 'method': 'ignore',
+ 'type': 'list',
+ 'result': True
+ },
+ ########################################################################################
+ # set
+ {
+ 'a': [
+ 'x',
+ ],
+ 'b': [
+ 'y',
+ ],
+ 'method': 'strict',
+ 'type': 'set',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ ],
+ 'b': [
+ 'x',
+ 'x',
+ ],
+ 'method': 'strict',
+ 'type': 'set',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ ],
+ 'method': 'strict',
+ 'type': 'set',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'y',
+ 'x',
+ ],
+ 'method': 'strict',
+ 'type': 'set',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'x',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set',
+ 'result': False
+ },
+ {
+ 'a': [
+ 'x',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'x',
+ 'y',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'z',
+ ],
+ 'b': [
+ 'x',
+ 'y',
+ 'x',
+ 'z',
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set',
+ 'result': True
+ },
+ {
+ 'a': [
+ 'x',
+ 'a',
+ ],
+ 'b': [
+ 'y',
+ 'z',
+ ],
+ 'method': 'ignore',
+ 'type': 'set',
+ 'result': True
+ },
+ ########################################################################################
+ # set(dict)
+ {
+ 'a': [
+ {'x': 1},
+ ],
+ 'b': [
+ {'y': 1},
+ ],
+ 'method': 'strict',
+ 'type': 'set(dict)',
+ 'result': False
+ },
+ {
+ 'a': [
+ {'x': 1},
+ ],
+ 'b': [
+ {'x': 1},
+ ],
+ 'method': 'strict',
+ 'type': 'set(dict)',
+ 'result': True
+ },
+ {
+ 'a': [
+ {'x': 1},
+ ],
+ 'b': [
+ {'x': 1, 'y': 2},
+ ],
+ 'method': 'strict',
+ 'type': 'set(dict)',
+ 'result': True
+ },
+ {
+ 'a': [
+ {'x': 1},
+ {'x': 2, 'y': 3},
+ ],
+ 'b': [
+ {'x': 1},
+ {'x': 2, 'y': 3},
+ ],
+ 'method': 'strict',
+ 'type': 'set(dict)',
+ 'result': True
+ },
+ {
+ 'a': [
+ {'x': 1},
+ ],
+ 'b': [
+ {'x': 1, 'z': 2},
+ {'x': 2, 'y': 3},
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set(dict)',
+ 'result': True
+ },
+ {
+ 'a': [
+ {'x': 1, 'y': 2},
+ ],
+ 'b': [
+ {'x': 1},
+ {'x': 2, 'y': 3},
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set(dict)',
+ 'result': False
+ },
+ {
+ 'a': [
+ {'x': 1, 'y': 3},
+ ],
+ 'b': [
+ {'x': 1},
+ {'x': 1, 'y': 3, 'z': 4},
+ ],
+ 'method': 'allow_more_present',
+ 'type': 'set(dict)',
+ 'result': True
+ },
+ {
+ 'a': [
+ {'x': 1},
+ {'x': 2, 'y': 3},
+ ],
+ 'b': [
+ {'x': 1},
+ ],
+ 'method': 'ignore',
+ 'type': 'set(dict)',
+ 'result': True
+ },
+ ########################################################################################
+ # dict
+ {
+ 'a': {'x': 1},
+ 'b': {'y': 1},
+ 'method': 'strict',
+ 'type': 'dict',
+ 'result': False
+ },
+ {
+ 'a': {'x': 1},
+ 'b': {'x': 1, 'y': 2},
+ 'method': 'strict',
+ 'type': 'dict',
+ 'result': False
+ },
+ {
+ 'a': {'x': 1},
+ 'b': {'x': 1},
+ 'method': 'strict',
+ 'type': 'dict',
+ 'result': True
+ },
+ {
+ 'a': {'x': 1, 'z': 2},
+ 'b': {'x': 1, 'y': 2},
+ 'method': 'strict',
+ 'type': 'dict',
+ 'result': False
+ },
+ {
+ 'a': {'x': 1, 'z': 2},
+ 'b': {'x': 1, 'y': 2},
+ 'method': 'ignore',
+ 'type': 'dict',
+ 'result': True
+ },
+] + [{
+ 'a': entry['av'],
+ 'b': entry['bv'],
+ 'method': 'allow_more_present',
+ 'type': 'dict',
+ 'result': entry['result']
+} for entry in DICT_ALLOW_MORE_PRESENT]
+
+
+@pytest.mark.parametrize("entry", DICT_ALLOW_MORE_PRESENT)
+def test_dict_allow_more_present(entry):
+ assert compare_dict_allow_more_present(entry['av'], entry['bv']) == entry['result']
+
+
+@pytest.mark.parametrize("entry", COMPARE_GENERIC)
+def test_compare_generic(entry):
+ assert compare_generic(entry['a'], entry['b'], entry['method'], entry['type']) == entry['result']
+
+
+def test_convert_duration_to_nanosecond():
+ nanoseconds = convert_duration_to_nanosecond('5s')
+ assert nanoseconds == 5000000000
+ nanoseconds = convert_duration_to_nanosecond('1m5s')
+ assert nanoseconds == 65000000000
+ with pytest.raises(ValueError):
+ convert_duration_to_nanosecond([1, 2, 3])
+ with pytest.raises(ValueError):
+ convert_duration_to_nanosecond('10x')
+
+
+def test_parse_healthcheck():
+ result, disabled = parse_healthcheck({
+ 'test': 'sleep 1',
+ 'interval': '1s',
+ })
+ assert disabled is False
+ assert result == {
+ 'test': ['CMD-SHELL', 'sleep 1'],
+ 'interval': 1000000000
+ }
+
+ result, disabled = parse_healthcheck({
+ 'test': ['NONE'],
+ })
+ assert result is None
+ assert disabled
+
+ result, disabled = parse_healthcheck({
+ 'test': 'sleep 1',
+ 'interval': '1s423ms'
+ })
+ assert result == {
+ 'test': ['CMD-SHELL', 'sleep 1'],
+ 'interval': 1423000000
+ }
+ assert disabled is False
+
+ result, disabled = parse_healthcheck({
+ 'test': 'sleep 1',
+ 'interval': '1h1m2s3ms4us'
+ })
+ assert result == {
+ 'test': ['CMD-SHELL', 'sleep 1'],
+ 'interval': 3662003004000
+ }
+ assert disabled is False
diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py b/ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py
new file mode 100644
index 00000000..0ed3dd44
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/modules/conftest.py
@@ -0,0 +1,32 @@
+# Copyright (c) 2017 Ansible Project
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+
+import pytest
+
+from ansible.module_utils.six import string_types
+from ansible.module_utils.common.text.converters import to_bytes
+from ansible.module_utils.common._collections_compat import MutableMapping
+
+
+@pytest.fixture
+def patch_ansible_module(request, mocker):
+ if isinstance(request.param, string_types):
+ args = request.param
+ elif isinstance(request.param, MutableMapping):
+ if 'ANSIBLE_MODULE_ARGS' not in request.param:
+ request.param = {'ANSIBLE_MODULE_ARGS': request.param}
+ if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
+ request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
+ if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
+ request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
+ args = json.dumps(request.param)
+ else:
+ raise Exception('Malformed data to the patch_ansible_module pytest fixture')
+
+ mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))
diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py
new file mode 100644
index 00000000..3401837f
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_image.py
@@ -0,0 +1,114 @@
+# Copyright 2022 Red Hat | Ansible
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible_collections.community.docker.plugins.modules.docker_image import ImageManager
+
+from ansible_collections.community.docker.plugins.module_utils.image_archive import api_image_id
+
+from ..test_support.docker_image_archive_stubbing import (
+ write_imitation_archive,
+ write_irrelevant_tar,
+)
+
+
+def assert_no_logging(msg):
+ raise AssertionError('Should not have logged anything but logged %s' % msg)
+
+
+def capture_logging(messages):
+ def capture(msg):
+ messages.append(msg)
+
+ return capture
+
+
+@pytest.fixture
+def tar_file_name(tmpdir):
+ """
+ Return the name of a non-existing tar file in an existing temporary directory.
+ """
+
+ # Cast to str required by Python 2.x
+ return str(tmpdir.join('foo.tar'))
+
+
+def test_archived_image_action_when_missing(tar_file_name):
+ fake_name = 'a:latest'
+ fake_id = 'a1'
+
+ expected = 'Archived image %s to %s, since none present' % (fake_name, tar_file_name)
+
+ actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, fake_name, api_image_id(fake_id))
+
+ assert actual == expected
+
+
+def test_archived_image_action_when_current(tar_file_name):
+ fake_name = 'b:latest'
+ fake_id = 'b2'
+
+ write_imitation_archive(tar_file_name, fake_id, [fake_name])
+
+ actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, fake_name, api_image_id(fake_id))
+
+ assert actual is None
+
+
+def test_archived_image_action_when_invalid(tar_file_name):
+ fake_name = 'c:1.2.3'
+ fake_id = 'c3'
+
+ write_irrelevant_tar(tar_file_name)
+
+ expected = 'Archived image %s to %s, overwriting an unreadable archive file' % (fake_name, tar_file_name)
+
+ actual_log = []
+ actual = ImageManager.archived_image_action(
+ capture_logging(actual_log),
+ tar_file_name,
+ fake_name,
+ api_image_id(fake_id)
+ )
+
+ assert actual == expected
+
+ assert len(actual_log) == 1
+ assert actual_log[0].startswith('Unable to extract manifest summary from archive')
+
+
+def test_archived_image_action_when_obsolete_by_id(tar_file_name):
+ fake_name = 'd:0.0.1'
+ old_id = 'e5'
+ new_id = 'd4'
+
+ write_imitation_archive(tar_file_name, old_id, [fake_name])
+
+ expected = 'Archived image %s to %s, overwriting archive with image %s named %s' % (
+ fake_name, tar_file_name, old_id, fake_name
+ )
+ actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, fake_name, api_image_id(new_id))
+
+ assert actual == expected
+
+
+def test_archived_image_action_when_obsolete_by_name(tar_file_name):
+ old_name = 'hi'
+ new_name = 'd:0.0.1'
+ fake_id = 'd4'
+
+ write_imitation_archive(tar_file_name, fake_id, [old_name])
+
+ expected = 'Archived image %s to %s, overwriting archive with image %s named %s' % (
+ new_name, tar_file_name, fake_id, old_name
+ )
+ actual = ImageManager.archived_image_action(assert_no_logging, tar_file_name, new_name, api_image_id(fake_id))
+
+ print('actual : %s', actual)
+ print('expected : %s', expected)
+ assert actual == expected
diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py
new file mode 100644
index 00000000..a937c6db
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_network.py
@@ -0,0 +1,35 @@
+# Copyright (c) Ansible Project
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+"""Unit tests for docker_network."""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+from ansible_collections.community.docker.plugins.modules.docker_network import validate_cidr
+
+
+@pytest.mark.parametrize("cidr,expected", [
+ ('192.168.0.1/16', 'ipv4'),
+ ('192.168.0.1/24', 'ipv4'),
+ ('192.168.0.1/32', 'ipv4'),
+ ('fdd1:ac8c:0557:7ce2::/64', 'ipv6'),
+ ('fdd1:ac8c:0557:7ce2::/128', 'ipv6'),
+])
+def test_validate_cidr_positives(cidr, expected):
+ assert validate_cidr(cidr) == expected
+
+
+@pytest.mark.parametrize("cidr", [
+ '192.168.0.1',
+ '192.168.0.1/34',
+ '192.168.0.1/asd',
+ 'fdd1:ac8c:0557:7ce2::',
+])
+def test_validate_cidr_negatives(cidr):
+ with pytest.raises(ValueError) as e:
+ validate_cidr(cidr)
+ assert '"{0}" is not a valid CIDR'.format(cidr) == str(e.value)
diff --git a/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py
new file mode 100644
index 00000000..1cef623b
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/modules/test_docker_swarm_service.py
@@ -0,0 +1,514 @@
+# Copyright (c) Ansible Project
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import pytest
+
+
+class APIErrorMock(Exception):
+ def __init__(self, message, response=None, explanation=None):
+ self.message = message
+ self.response = response
+ self.explanation = explanation
+
+
+@pytest.fixture(autouse=True)
+def docker_module_mock(mocker):
+ docker_module_mock = mocker.MagicMock()
+ docker_utils_module_mock = mocker.MagicMock()
+ docker_errors_module_mock = mocker.MagicMock()
+ docker_errors_module_mock.APIError = APIErrorMock
+ mock_modules = {
+ 'docker': docker_module_mock,
+ 'docker.utils': docker_utils_module_mock,
+ 'docker.errors': docker_errors_module_mock,
+ }
+ return mocker.patch.dict('sys.modules', **mock_modules)
+
+
+@pytest.fixture(autouse=True)
+def docker_swarm_service():
+ from ansible_collections.community.docker.plugins.modules import docker_swarm_service
+
+ return docker_swarm_service
+
+
+def test_retry_on_out_of_sequence_error(mocker, docker_swarm_service):
+ run_mock = mocker.MagicMock(
+ side_effect=APIErrorMock(
+ message='',
+ response=None,
+ explanation='rpc error: code = Unknown desc = update out of sequence',
+ )
+ )
+ manager = docker_swarm_service.DockerServiceManager(client=None)
+ manager.run = run_mock
+ with pytest.raises(APIErrorMock):
+ manager.run_safe()
+ assert run_mock.call_count == 3
+
+
+def test_no_retry_on_general_api_error(mocker, docker_swarm_service):
+ run_mock = mocker.MagicMock(
+ side_effect=APIErrorMock(message='', response=None, explanation='some error')
+ )
+ manager = docker_swarm_service.DockerServiceManager(client=None)
+ manager.run = run_mock
+ with pytest.raises(APIErrorMock):
+ manager.run_safe()
+ assert run_mock.call_count == 1
+
+
+def test_get_docker_environment(mocker, docker_swarm_service):
+ env_file_result = {'TEST1': 'A', 'TEST2': 'B', 'TEST3': 'C'}
+ env_dict = {'TEST3': 'CC', 'TEST4': 'D'}
+ env_string = "TEST3=CC,TEST4=D"
+
+ env_list = ['TEST3=CC', 'TEST4=D']
+ expected_result = sorted(['TEST1=A', 'TEST2=B', 'TEST3=CC', 'TEST4=D'])
+ mocker.patch.object(
+ docker_swarm_service, 'parse_env_file', return_value=env_file_result
+ )
+ mocker.patch.object(
+ docker_swarm_service,
+ 'format_environment',
+ side_effect=lambda d: ['{0}={1}'.format(key, value) for key, value in d.items()],
+ )
+ # Test with env dict and file
+ result = docker_swarm_service.get_docker_environment(
+ env_dict, env_files=['dummypath']
+ )
+ assert result == expected_result
+ # Test with env list and file
+ result = docker_swarm_service.get_docker_environment(
+ env_list,
+ env_files=['dummypath']
+ )
+ assert result == expected_result
+ # Test with env string and file
+ result = docker_swarm_service.get_docker_environment(
+ env_string, env_files=['dummypath']
+ )
+ assert result == expected_result
+
+ assert result == expected_result
+ # Test with empty env
+ result = docker_swarm_service.get_docker_environment(
+ [], env_files=None
+ )
+ assert result == []
+ # Test with empty env_files
+ result = docker_swarm_service.get_docker_environment(
+ None, env_files=[]
+ )
+ assert result == []
+
+
+def test_get_nanoseconds_from_raw_option(docker_swarm_service):
+ value = docker_swarm_service.get_nanoseconds_from_raw_option('test', None)
+ assert value is None
+
+ value = docker_swarm_service.get_nanoseconds_from_raw_option('test', '1m30s535ms')
+ assert value == 90535000000
+
+ value = docker_swarm_service.get_nanoseconds_from_raw_option('test', 10000000000)
+ assert value == 10000000000
+
+ with pytest.raises(ValueError):
+ docker_swarm_service.get_nanoseconds_from_raw_option('test', [])
+
+
+def test_has_dict_changed(docker_swarm_service):
+ assert not docker_swarm_service.has_dict_changed(
+ {"a": 1},
+ {"a": 1},
+ )
+ assert not docker_swarm_service.has_dict_changed(
+ {"a": 1},
+ {"a": 1, "b": 2}
+ )
+ assert docker_swarm_service.has_dict_changed(
+ {"a": 1},
+ {"a": 2, "b": 2}
+ )
+ assert docker_swarm_service.has_dict_changed(
+ {"a": 1, "b": 1},
+ {"a": 1}
+ )
+ assert not docker_swarm_service.has_dict_changed(
+ None,
+ {"a": 2, "b": 2}
+ )
+ assert docker_swarm_service.has_dict_changed(
+ {},
+ {"a": 2, "b": 2}
+ )
+ assert docker_swarm_service.has_dict_changed(
+ {"a": 1},
+ {}
+ )
+ assert docker_swarm_service.has_dict_changed(
+ {"a": 1},
+ None
+ )
+ assert not docker_swarm_service.has_dict_changed(
+ {},
+ {}
+ )
+ assert not docker_swarm_service.has_dict_changed(
+ None,
+ None
+ )
+ assert not docker_swarm_service.has_dict_changed(
+ {},
+ None
+ )
+ assert not docker_swarm_service.has_dict_changed(
+ None,
+ {}
+ )
+
+
+def test_has_list_changed(docker_swarm_service):
+
+ # List comparisons without dictionaries
+ # I could improve the indenting, but pycodestyle wants this instead
+ assert not docker_swarm_service.has_list_changed(None, None)
+ assert not docker_swarm_service.has_list_changed(None, [])
+ assert not docker_swarm_service.has_list_changed(None, [1, 2])
+
+ assert not docker_swarm_service.has_list_changed([], None)
+ assert not docker_swarm_service.has_list_changed([], [])
+ assert docker_swarm_service.has_list_changed([], [1, 2])
+
+ assert docker_swarm_service.has_list_changed([1, 2], None)
+ assert docker_swarm_service.has_list_changed([1, 2], [])
+
+ assert docker_swarm_service.has_list_changed([1, 2, 3], [1, 2])
+ assert docker_swarm_service.has_list_changed([1, 2], [1, 2, 3])
+
+ # Check list sorting
+ assert not docker_swarm_service.has_list_changed([1, 2], [2, 1])
+ assert docker_swarm_service.has_list_changed(
+ [1, 2],
+ [2, 1],
+ sort_lists=False
+ )
+
+ # Check type matching
+ assert docker_swarm_service.has_list_changed([None, 1], [2, 1])
+ assert docker_swarm_service.has_list_changed([2, 1], [None, 1])
+ assert docker_swarm_service.has_list_changed(
+ "command --with args",
+ ['command', '--with', 'args']
+ )
+ assert docker_swarm_service.has_list_changed(
+ ['sleep', '3400'],
+ [u'sleep', u'3600'],
+ sort_lists=False
+ )
+
+ # List comparisons with dictionaries
+ assert not docker_swarm_service.has_list_changed(
+ [{'a': 1}],
+ [{'a': 1}],
+ sort_key='a'
+ )
+
+ assert not docker_swarm_service.has_list_changed(
+ [{'a': 1}, {'a': 2}],
+ [{'a': 1}, {'a': 2}],
+ sort_key='a'
+ )
+
+ with pytest.raises(Exception):
+ docker_swarm_service.has_list_changed(
+ [{'a': 1}, {'a': 2}],
+ [{'a': 1}, {'a': 2}]
+ )
+
+ # List sort checking with sort key
+ assert not docker_swarm_service.has_list_changed(
+ [{'a': 1}, {'a': 2}],
+ [{'a': 2}, {'a': 1}],
+ sort_key='a'
+ )
+ assert docker_swarm_service.has_list_changed(
+ [{'a': 1}, {'a': 2}],
+ [{'a': 2}, {'a': 1}],
+ sort_lists=False
+ )
+
+ assert docker_swarm_service.has_list_changed(
+ [{'a': 1}, {'a': 2}, {'a': 3}],
+ [{'a': 2}, {'a': 1}],
+ sort_key='a'
+ )
+ assert docker_swarm_service.has_list_changed(
+ [{'a': 1}, {'a': 2}],
+ [{'a': 1}, {'a': 2}, {'a': 3}],
+ sort_lists=False
+ )
+
+ # Additional dictionary elements
+ assert not docker_swarm_service.has_list_changed(
+ [
+ {"src": 1, "dst": 2},
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ ],
+ [
+ {"src": 1, "dst": 2, "protocol": "tcp"},
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ ],
+ sort_key='dst'
+ )
+ assert not docker_swarm_service.has_list_changed(
+ [
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ {"src": 1, "dst": 3, "protocol": "tcp"},
+ ],
+ [
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ {"src": 1, "dst": 3, "protocol": "tcp"},
+ ],
+ sort_key='dst'
+ )
+ assert docker_swarm_service.has_list_changed(
+ [
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ {"src": 1, "dst": 2},
+ {"src": 3, "dst": 4},
+ ],
+ [
+ {"src": 1, "dst": 3, "protocol": "udp"},
+ {"src": 1, "dst": 2, "protocol": "tcp"},
+ {"src": 3, "dst": 4, "protocol": "tcp"},
+ ],
+ sort_key='dst'
+ )
+ assert docker_swarm_service.has_list_changed(
+ [
+ {"src": 1, "dst": 3, "protocol": "tcp"},
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ ],
+ [
+ {"src": 1, "dst": 2, "protocol": "tcp"},
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ ],
+ sort_key='dst'
+ )
+ assert docker_swarm_service.has_list_changed(
+ [
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ {"src": 1, "dst": 2, "protocol": "tcp", "extra": {"test": "foo"}},
+ ],
+ [
+ {"src": 1, "dst": 2, "protocol": "udp"},
+ {"src": 1, "dst": 2, "protocol": "tcp"},
+ ],
+ sort_key='dst'
+ )
+ assert not docker_swarm_service.has_list_changed(
+ [{'id': '123', 'aliases': []}],
+ [{'id': '123'}],
+ sort_key='id'
+ )
+
+
+def test_have_networks_changed(docker_swarm_service):
+ assert not docker_swarm_service.have_networks_changed(
+ None,
+ None
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [],
+ None
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [{'id': 1}],
+ [{'id': 1}]
+ )
+
+ assert docker_swarm_service.have_networks_changed(
+ [{'id': 1}],
+ [{'id': 1}, {'id': 2}]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [{'id': 1}, {'id': 2}],
+ [{'id': 1}, {'id': 2}]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [{'id': 1}, {'id': 2}],
+ [{'id': 2}, {'id': 1}]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': []}
+ ],
+ [
+ {'id': 1},
+ {'id': 2}
+ ]
+ )
+
+ assert docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias1']}
+ ],
+ [
+ {'id': 1},
+ {'id': 2}
+ ]
+ )
+
+ assert docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}
+ ],
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias1']}
+ ]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}
+ ],
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}
+ ]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}
+ ],
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias2', 'alias1']}
+ ]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1, 'options': {}},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}],
+ [
+ {'id': 1},
+ {'id': 2, 'aliases': ['alias2', 'alias1']}
+ ]
+ )
+
+ assert not docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1, 'options': {'option1': 'value1'}},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}],
+ [
+ {'id': 1, 'options': {'option1': 'value1'}},
+ {'id': 2, 'aliases': ['alias2', 'alias1']}
+ ]
+ )
+
+ assert docker_swarm_service.have_networks_changed(
+ [
+ {'id': 1, 'options': {'option1': 'value1'}},
+ {'id': 2, 'aliases': ['alias1', 'alias2']}],
+ [
+ {'id': 1, 'options': {'option1': 'value2'}},
+ {'id': 2, 'aliases': ['alias2', 'alias1']}
+ ]
+ )
+
+
+def test_get_docker_networks(docker_swarm_service):
+ network_names = [
+ 'network_1',
+ 'network_2',
+ 'network_3',
+ 'network_4',
+ ]
+ networks = [
+ network_names[0],
+ {'name': network_names[1]},
+ {'name': network_names[2], 'aliases': ['networkalias1']},
+ {'name': network_names[3], 'aliases': ['networkalias2'], 'options': {'foo': 'bar'}},
+ ]
+ network_ids = {
+ network_names[0]: '1',
+ network_names[1]: '2',
+ network_names[2]: '3',
+ network_names[3]: '4',
+ }
+ parsed_networks = docker_swarm_service.get_docker_networks(
+ networks,
+ network_ids
+ )
+ assert len(parsed_networks) == 4
+ for i, network in enumerate(parsed_networks):
+ assert 'name' not in network
+ assert 'id' in network
+ expected_name = network_names[i]
+ assert network['id'] == network_ids[expected_name]
+ if i == 2:
+ assert network['aliases'] == ['networkalias1']
+ if i == 3:
+ assert network['aliases'] == ['networkalias2']
+ if i == 3:
+ assert 'foo' in network['options']
+ # Test missing name
+ with pytest.raises(TypeError):
+ docker_swarm_service.get_docker_networks([{'invalid': 'err'}], {'err': 1})
+ # test for invalid aliases type
+ with pytest.raises(TypeError):
+ docker_swarm_service.get_docker_networks(
+ [{'name': 'test', 'aliases': 1}],
+ {'test': 1}
+ )
+ # Test invalid aliases elements
+ with pytest.raises(TypeError):
+ docker_swarm_service.get_docker_networks(
+ [{'name': 'test', 'aliases': [1]}],
+ {'test': 1}
+ )
+ # Test for invalid options type
+ with pytest.raises(TypeError):
+ docker_swarm_service.get_docker_networks(
+ [{'name': 'test', 'options': 1}],
+ {'test': 1}
+ )
+ # Test for invalid networks type
+ with pytest.raises(TypeError):
+ docker_swarm_service.get_docker_networks(
+ 1,
+ {'test': 1}
+ )
+ # Test for non existing networks
+ with pytest.raises(ValueError):
+ docker_swarm_service.get_docker_networks(
+ [{'name': 'idontexist'}],
+ {'test': 1}
+ )
+ # Test empty values
+ assert docker_swarm_service.get_docker_networks([], {}) == []
+ assert docker_swarm_service.get_docker_networks(None, {}) is None
+ # Test invalid options
+ with pytest.raises(TypeError):
+ docker_swarm_service.get_docker_networks(
+ [{'name': 'test', 'nonexisting_option': 'foo'}],
+ {'test': '1'}
+ )
diff --git a/ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py b/ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py
new file mode 100644
index 00000000..842ec4cf
--- /dev/null
+++ b/ansible_collections/community/docker/tests/unit/plugins/test_support/docker_image_archive_stubbing.py
@@ -0,0 +1,76 @@
+# Copyright 2022 Red Hat | Ansible
+# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import json
+import tarfile
+from tempfile import TemporaryFile
+
+
+def write_imitation_archive(file_name, image_id, repo_tags):
+ '''
+ Write a tar file meeting these requirements:
+
+ * Has a file manifest.json
+ * manifest.json contains a one-element array
+ * The element has a Config property with "[image_id].json" as the value name
+
+ :param file_name: Name of file to create
+ :type file_name: str
+ :param image_id: Fake sha256 hash (without the sha256: prefix)
+ :type image_id: str
+ :param repo_tags: list of fake image:tag's
+ :type repo_tags: list
+ '''
+
+ manifest = [
+ {
+ 'Config': '%s.json' % image_id,
+ 'RepoTags': repo_tags
+ }
+ ]
+
+ write_imitation_archive_with_manifest(file_name, manifest)
+
+
+def write_imitation_archive_with_manifest(file_name, manifest):
+ tf = tarfile.open(file_name, 'w')
+ try:
+ with TemporaryFile() as f:
+ f.write(json.dumps(manifest).encode('utf-8'))
+
+ ti = tarfile.TarInfo('manifest.json')
+ ti.size = f.tell()
+
+ f.seek(0)
+ tf.addfile(ti, f)
+
+ finally:
+ # In Python 2.6, this does not have __exit__
+ tf.close()
+
+
+def write_irrelevant_tar(file_name):
+ '''
+ Create a tar file that does not match the spec for "docker image save" / "docker image load" commands.
+
+ :param file_name: Name of tar file to create
+ :type file_name: str
+ '''
+
+ tf = tarfile.open(file_name, 'w')
+ try:
+ with TemporaryFile() as f:
+ f.write('Hello, world.'.encode('utf-8'))
+
+ ti = tarfile.TarInfo('hi.txt')
+ ti.size = f.tell()
+
+ f.seek(0)
+ tf.addfile(ti, f)
+
+ finally:
+ tf.close()