summaryrefslogtreecommitdiffstats
path: root/lib/ansible/galaxy/collection/gpg.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/galaxy/collection/gpg.py')
-rw-r--r--lib/ansible/galaxy/collection/gpg.py282
1 files changed, 282 insertions, 0 deletions
diff --git a/lib/ansible/galaxy/collection/gpg.py b/lib/ansible/galaxy/collection/gpg.py
new file mode 100644
index 0000000..8641f0d
--- /dev/null
+++ b/lib/ansible/galaxy/collection/gpg.py
@@ -0,0 +1,282 @@
+# -*- coding: utf-8 -*-
+# Copyright: (c) 2022, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+"""Signature verification helpers."""
+
+from ansible.errors import AnsibleError
+from ansible.galaxy.user_agent import user_agent
+from ansible.module_utils.urls import open_url
+
+import contextlib
+import os
+import subprocess
+import sys
+import typing as t
+
+from dataclasses import dataclass, fields as dc_fields
+from functools import partial
+from urllib.error import HTTPError, URLError
+
+if t.TYPE_CHECKING:
+ from ansible.utils.display import Display
+
+IS_PY310_PLUS = sys.version_info[:2] >= (3, 10)
+
+frozen_dataclass = partial(dataclass, frozen=True, **({'slots': True} if IS_PY310_PLUS else {}))
+
+
+def get_signature_from_source(source, display=None): # type: (str, t.Optional[Display]) -> str
+ if display is not None:
+ display.vvvv(f"Using signature at {source}")
+ try:
+ with open_url(
+ source,
+ http_agent=user_agent(),
+ validate_certs=True,
+ follow_redirects='safe'
+ ) as resp:
+ signature = resp.read()
+ except (HTTPError, URLError) as e:
+ raise AnsibleError(
+ f"Failed to get signature for collection verification from '{source}': {e}"
+ ) from e
+
+ return signature
+
+
+def run_gpg_verify(
+ manifest_file, # type: str
+ signature, # type: str
+ keyring, # type: str
+ display, # type: Display
+): # type: (...) -> tuple[str, int]
+ status_fd_read, status_fd_write = os.pipe()
+
+ # running the gpg command will create the keyring if it does not exist
+ remove_keybox = not os.path.exists(keyring)
+
+ cmd = [
+ 'gpg',
+ f'--status-fd={status_fd_write}',
+ '--verify',
+ '--batch',
+ '--no-tty',
+ '--no-default-keyring',
+ f'--keyring={keyring}',
+ '-',
+ manifest_file,
+ ]
+ cmd_str = ' '.join(cmd)
+ display.vvvv(f"Running command '{cmd}'")
+
+ try:
+ p = subprocess.Popen(
+ cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ pass_fds=(status_fd_write,),
+ encoding='utf8',
+ )
+ except (FileNotFoundError, subprocess.SubprocessError) as err:
+ raise AnsibleError(
+ f"Failed during GnuPG verification with command '{cmd_str}': {err}"
+ ) from err
+ else:
+ stdout, stderr = p.communicate(input=signature)
+ finally:
+ os.close(status_fd_write)
+
+ if remove_keybox:
+ with contextlib.suppress(OSError):
+ os.remove(keyring)
+
+ with os.fdopen(status_fd_read) as f:
+ stdout = f.read()
+ display.vvvv(
+ f"stdout: \n{stdout}\nstderr: \n{stderr}\n(exit code {p.returncode})"
+ )
+ return stdout, p.returncode
+
+
+def parse_gpg_errors(status_out): # type: (str) -> t.Iterator[GpgBaseError]
+ for line in status_out.splitlines():
+ if not line:
+ continue
+ try:
+ _dummy, status, remainder = line.split(maxsplit=2)
+ except ValueError:
+ _dummy, status = line.split(maxsplit=1)
+ remainder = None
+
+ try:
+ cls = GPG_ERROR_MAP[status]
+ except KeyError:
+ continue
+
+ fields = [status]
+ if remainder:
+ fields.extend(
+ remainder.split(
+ None,
+ len(dc_fields(cls)) - 2
+ )
+ )
+
+ yield cls(*fields)
+
+
+@frozen_dataclass
+class GpgBaseError(Exception):
+ status: str
+
+ @classmethod
+ def get_gpg_error_description(cls) -> str:
+ """Return the current class description."""
+ return ' '.join(cls.__doc__.split())
+
+ def __post_init__(self):
+ for field in dc_fields(self):
+ super(GpgBaseError, self).__setattr__(field.name, field.type(getattr(self, field.name)))
+
+
+@frozen_dataclass
+class GpgExpSig(GpgBaseError):
+ """The signature with the keyid is good, but the signature is expired."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgExpKeySig(GpgBaseError):
+ """The signature with the keyid is good, but the signature was made by an expired key."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgRevKeySig(GpgBaseError):
+ """The signature with the keyid is good, but the signature was made by a revoked key."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgBadSig(GpgBaseError):
+ """The signature with the keyid has not been verified okay."""
+ keyid: str
+ username: str
+
+
+@frozen_dataclass
+class GpgErrSig(GpgBaseError):
+ """"It was not possible to check the signature. This may be caused by
+ a missing public key or an unsupported algorithm. A RC of 4
+ indicates unknown algorithm, a 9 indicates a missing public
+ key.
+ """
+ keyid: str
+ pkalgo: int
+ hashalgo: int
+ sig_class: str
+ time: int
+ rc: int
+ fpr: str
+
+
+@frozen_dataclass
+class GpgNoPubkey(GpgBaseError):
+ """The public key is not available."""
+ keyid: str
+
+
+@frozen_dataclass
+class GpgMissingPassPhrase(GpgBaseError):
+ """No passphrase was supplied."""
+
+
+@frozen_dataclass
+class GpgBadPassphrase(GpgBaseError):
+ """The supplied passphrase was wrong or not given."""
+ keyid: str
+
+
+@frozen_dataclass
+class GpgNoData(GpgBaseError):
+ """No data has been found. Codes for WHAT are:
+ - 1 :: No armored data.
+ - 2 :: Expected a packet but did not find one.
+ - 3 :: Invalid packet found, this may indicate a non OpenPGP
+ message.
+ - 4 :: Signature expected but not found.
+ """
+ what: str
+
+
+@frozen_dataclass
+class GpgUnexpected(GpgBaseError):
+ """No data has been found. Codes for WHAT are:
+ - 1 :: No armored data.
+ - 2 :: Expected a packet but did not find one.
+ - 3 :: Invalid packet found, this may indicate a non OpenPGP
+ message.
+ - 4 :: Signature expected but not found.
+ """
+ what: str
+
+
+@frozen_dataclass
+class GpgError(GpgBaseError):
+ """This is a generic error status message, it might be followed by error location specific data."""
+ location: str
+ code: int
+ more: str = ""
+
+
+@frozen_dataclass
+class GpgFailure(GpgBaseError):
+ """This is the counterpart to SUCCESS and used to indicate a program failure."""
+ location: str
+ code: int
+
+
+@frozen_dataclass
+class GpgBadArmor(GpgBaseError):
+ """The ASCII armor is corrupted."""
+
+
+@frozen_dataclass
+class GpgKeyExpired(GpgBaseError):
+ """The key has expired."""
+ timestamp: int
+
+
+@frozen_dataclass
+class GpgKeyRevoked(GpgBaseError):
+ """The used key has been revoked by its owner."""
+
+
+@frozen_dataclass
+class GpgNoSecKey(GpgBaseError):
+ """The secret key is not available."""
+ keyid: str
+
+
+GPG_ERROR_MAP = {
+ 'EXPSIG': GpgExpSig,
+ 'EXPKEYSIG': GpgExpKeySig,
+ 'REVKEYSIG': GpgRevKeySig,
+ 'BADSIG': GpgBadSig,
+ 'ERRSIG': GpgErrSig,
+ 'NO_PUBKEY': GpgNoPubkey,
+ 'MISSING_PASSPHRASE': GpgMissingPassPhrase,
+ 'BAD_PASSPHRASE': GpgBadPassphrase,
+ 'NODATA': GpgNoData,
+ 'UNEXPECTED': GpgUnexpected,
+ 'ERROR': GpgError,
+ 'FAILURE': GpgFailure,
+ 'BADARMOR': GpgBadArmor,
+ 'KEYEXPIRED': GpgKeyExpired,
+ 'KEYREVOKED': GpgKeyRevoked,
+ 'NO_SECKEY': GpgNoSecKey,
+}